Skip to content

Commit

Permalink
Add database and some other commands in pipeline (#2832)
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Jan 20, 2022
1 parent 0f4bed9 commit 49bf437
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 9 deletions.
73 changes: 71 additions & 2 deletions src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.json.JSONArray;

import redis.clients.jedis.args.*;
import redis.clients.jedis.commands.DatabasePipelineCommands;
import redis.clients.jedis.commands.PipelineBinaryCommands;
import redis.clients.jedis.commands.PipelineCommands;
import redis.clients.jedis.commands.ProtocolCommand;
Expand All @@ -25,8 +26,8 @@
import redis.clients.jedis.search.aggr.AggregationBuilder;
import redis.clients.jedis.search.aggr.AggregationResult;

public class Pipeline extends Queable implements PipelineCommands, PipelineBinaryCommands,
RedisModulePipelineCommands, Closeable {
public class Pipeline extends Queable implements PipelineCommands, PipelineBinaryCommands,
DatabasePipelineCommands, RedisModulePipelineCommands, Closeable {

protected final Connection connection;
// private final Jedis jedis;
Expand Down Expand Up @@ -3330,6 +3331,74 @@ public Response<Long> waitReplicas(int replicas, long timeout) {
return appendCommand(commandObjects.waitReplicas(replicas, timeout));
}

public Response<List<String>> time() {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.TIME), BuilderFactory.STRING_LIST));
}

@Override
public Response<String> select(final int index) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.SELECT), BuilderFactory.STRING));
}

@Override
public Response<Long> dbSize() {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.DBSIZE), BuilderFactory.LONG));
}

@Override
public Response<String> swapDB(final int index1, final int index2) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.SWAPDB)
.add(index1).add(index2), BuilderFactory.STRING));
}

@Override
public Response<Long> move(String key, int dbIndex) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.MOVE)
.key(key).add(dbIndex), BuilderFactory.LONG));
}

@Override
public Response<Long> move(final byte[] key, final int dbIndex) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.MOVE)
.key(key).add(dbIndex), BuilderFactory.LONG));
}

@Override
public Response<Boolean> copy(String srcKey, String dstKey, int db, boolean replace) {
return appendCommand(commandObjects.copy(srcKey, dstKey, db, replace));
}

@Override
public Response<Boolean> copy(byte[] srcKey, byte[] dstKey, int db, boolean replace) {
return appendCommand(commandObjects.copy(srcKey, dstKey, db, replace));
}

@Override
public Response<String> migrate(String host, int port, byte[] key, int destinationDB, int timeout) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.MIGRATE)
.add(host).add(port).key(key).add(destinationDB).add(timeout), BuilderFactory.STRING));
}

@Override
public Response<String> migrate(String host, int port, String key, int destinationDB, int timeout) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.MIGRATE)
.add(host).add(port).key(key).add(destinationDB).add(timeout), BuilderFactory.STRING));
}

@Override
public Response<String> migrate(String host, int port, int destinationDB, int timeout, MigrateParams params, byte[]... keys) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.MIGRATE)
.add(host).add(port).add(new byte[0]).add(destinationDB).add(timeout).addParams(params)
.add(Protocol.Keyword.KEYS).keys((Object[]) keys), BuilderFactory.STRING));
}

@Override
public Response<String> migrate(String host, int port, int destinationDB, int timeout, MigrateParams params, String... keys) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.MIGRATE)
.add(host).add(port).add(new byte[0]).add(destinationDB).add(timeout).addParams(params)
.add(Protocol.Keyword.KEYS).keys((Object[]) keys), BuilderFactory.STRING));
}

public Response<Object> sendCommand(ProtocolCommand cmd, String... args) {
return sendCommand(new CommandArguments(cmd).addObjects((Object[]) args));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package redis.clients.jedis.commands;

import redis.clients.jedis.Response;
import redis.clients.jedis.params.MigrateParams;

public interface DatabasePipelineCommands {

Response<String> select(int index);

Response<Long> dbSize();

Response<String> swapDB(int index1, int index2);

Response<Long> move(String key, int dbIndex);

Response<Long> move(byte[] key, int dbIndex);

Response<Boolean> copy(String srcKey, String dstKey, int db, boolean replace);

Response<Boolean> copy(byte[] srcKey, byte[] dstKey, int db, boolean replace);

Response<String> migrate(String host, int port, byte[] key, int destinationDB, int timeout);

Response<String> migrate(String host, int port, int destinationDB, int timeout, MigrateParams params, byte[]... keys);

Response<String> migrate(String host, int port, String key, int destinationDB, int timeout);

Response<String> migrate(String host, int port, int destinationDB, int timeout, MigrateParams params, String... keys);

}
14 changes: 7 additions & 7 deletions src/test/java/redis/clients/jedis/PipeliningTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ private void verifyHasBothValues(byte[] firstKey, byte[] secondKey, byte[] value
assertTrue(Arrays.equals(firstKey, value1) || Arrays.equals(firstKey, value2));
assertTrue(Arrays.equals(secondKey, value1) || Arrays.equals(secondKey, value2));
}
//
// @Test
// public void pipelineSelect() {
// Pipeline p = jedis.pipelined();
// p.select(1);
// p.sync();
// }

@Test
public void pipelineSelect() {
Pipeline p = jedis.pipelined();
p.select(1);
p.sync();
}

@Test
public void pipelineResponseWithoutData() {
Expand Down

0 comments on commit 49bf437

Please sign in to comment.