Skip to content

Commit

Permalink
Partial merge from vachaPOC
Browse files Browse the repository at this point in the history
Signed-off-by: Finn Carroll <carrofin@amazon.com>
  • Loading branch information
finnegancarroll committed Sep 16, 2024
1 parent e3bbc74 commit 0e58640
Show file tree
Hide file tree
Showing 101 changed files with 15,487 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.common.io.stream;

import java.io.IOException;
import java.io.OutputStream;

/**
* Implementers can be written to write to output and read from input using Protobuf.
*
* @opensearch.internal
*/
public interface ProtobufWriteable {

/**
* Write this into the stream output.
*/
public void writeTo(OutputStream out) throws IOException;

/**
* Reference to a method that can write some object to a {@link OutputStream}.
* Most classes should implement {@link ProtobufWriteable} and the {@link ProtobufWriteable#writeTo(OutputStream)} method should <em>use</em>
* {@link OutputStream} methods directly or this indirectly:
* <pre><code>
* public void writeTo(OutputStream out) throws IOException {
* out.writeVInt(someValue);
* }
* </code></pre>
*/
@FunctionalInterface
interface Writer<V> {

/**
* Write {@code V}-type {@code value} to the {@code out}put stream.
*
* @param out Output to write the {@code value} too
* @param value The value to add
*/
void write(OutputStream out, V value) throws IOException;

}

/**
* Reference to a method that can read some object from a stream. By convention this is a constructor that takes
* {@linkplain byte[]} as an argument for most classes and a static method for things like enums.
* <pre><code>
* public MyClass(final byte[] in) throws IOException {
* this.someValue = in.readVInt();
* }
* </code></pre>
*/
@FunctionalInterface
interface Reader<V> {

/**
* Read {@code V}-type value from a stream.
*
* @param in Input to read the value from
*/
V read(byte[] in) throws IOException;

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import org.opensearch.core.common.io.stream.ProtobufWriteable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportException;
import org.opensearch.core.transport.TransportResponse;

import java.io.IOException;
import java.util.Objects;

/**
* A simple base class for action response listeners, defaulting to using the SAME executor (as its
* very common on response handlers).
*
* @opensearch.api
*/
public class ProtobufActionListenerResponseHandler<Response extends TransportResponse> implements TransportResponseHandler<Response> {

private final ActionListener<? super Response> listener;
private final ProtobufWriteable.Reader<Response> reader;
private final String executor;

public ProtobufActionListenerResponseHandler(
ActionListener<? super Response> listener,
ProtobufWriteable.Reader<Response> reader,
String executor
) {
this.listener = Objects.requireNonNull(listener);
this.reader = Objects.requireNonNull(reader);
this.executor = Objects.requireNonNull(executor);
}

public ProtobufActionListenerResponseHandler(ActionListener<? super Response> listener, ProtobufWriteable.Reader<Response> reader) {
this(listener, reader, ThreadPool.Names.SAME);
}

@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException e) {
listener.onFailure(e);
}

@Override
public String executor() {
return executor;
}

@Override
public String toString() {
return super.toString() + "/" + listener;
}

@Override
public Response read(StreamInput in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}

@Override
public Response read(byte[] in) throws IOException {
return reader.read(in);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.io.OutputStream;

/**
* Base action request implemented by plugins.
*
* @opensearch.api
*/
public abstract class ProtobufActionRequest extends TransportRequest {

public ProtobufActionRequest() {
super();
// this does not set the listenerThreaded API, if needed, its up to the caller to set it
// since most times, we actually want it to not be threaded...
// this.listenerThreaded = request.listenerThreaded();
}

public ProtobufActionRequest(byte[] in) throws IOException {
super(in);
}

public abstract ActionRequestValidationException validate();

/**
* Should this task store its result after it has finished?
*/
public boolean getShouldStoreResult() {
return false;
}

@Override
public void writeTo(OutputStream out) throws IOException {
super.writeTo(out);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import org.opensearch.client.ProtobufOpenSearchClient;
import org.opensearch.common.unit.TimeValue;

import java.util.Objects;

/**
* Base Action Request Builder
*
* @opensearch.api
*/
public abstract class ProtobufActionRequestBuilder<Request extends ProtobufActionRequest, Response extends ProtobufActionResponse> {

protected final ProtobufActionType<Response> action;
protected final Request request;
protected final ProtobufOpenSearchClient client;

protected ProtobufActionRequestBuilder(ProtobufOpenSearchClient client, ProtobufActionType<Response> action, Request request) {
Objects.requireNonNull(action, "action must not be null");
this.action = action;
this.request = request;
this.client = client;
}

public Request request() {
return this.request;
}

public ActionFuture<Response> execute() {
return client.execute(action, request);
}

/**
* Short version of execute().actionGet().
*/
public Response get() {
return execute().actionGet();
}

/**
* Short version of execute().actionGet().
*/
public Response get(TimeValue timeout) {
return execute().actionGet(timeout);
}

/**
* Short version of execute().actionGet().
*/
public Response get(String timeout) {
return execute().actionGet(timeout);
}

public void execute(ActionListener<Response> listener) {
client.execute(action, request, listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import org.opensearch.core.transport.TransportResponse;

import java.io.IOException;

/**
* Base class for responses to action requests implemented by plugins.
*
* @opensearch.api
*/
public abstract class ProtobufActionResponse extends TransportResponse {

public ProtobufActionResponse() {}

public ProtobufActionResponse(byte[] in) throws IOException {
super(in);
}
}
64 changes: 64 additions & 0 deletions server/src/main/java/org/opensearch/action/ProtobufActionType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.ProtobufWriteable;
import org.opensearch.transport.TransportRequestOptions;

/**
* A generic action. Should strive to make it a singleton.
*
* @opensearch.api
*/
public class ProtobufActionType<Response extends ProtobufActionResponse> {

private final String name;
private final ProtobufWriteable.Reader<Response> responseReader;

/**
* @param name The name of the action, must be unique across actions.
* @param responseReader A reader for the response type
*/
public ProtobufActionType(String name, ProtobufWriteable.Reader<Response> responseReader) {
this.name = name;
this.responseReader = responseReader;
}

/**
* The name of the action. Must be unique across actions.
*/
public String name() {
return this.name;
}

/**
* Get a reader that can create a new instance of the class from a {@link byte[]}
*/
public ProtobufWriteable.Reader<Response> getResponseReaderTry() {
return responseReader;
}

/**
* Optional request options for the action.
*/
public TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.EMPTY;
}

@Override
public boolean equals(Object o) {
return o instanceof ProtobufActionType && name.equals(((ProtobufActionType<?>) o).name());
}

@Override
public int hashCode() {
return name.hashCode();
}
}
Loading

0 comments on commit 0e58640

Please sign in to comment.