Skip to content

Commit

Permalink
xreadgroup command test
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Mar 27, 2024
1 parent 9140c3d commit 7e54283
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,10 @@ public void xreadWithParams() {
@Test
public void xreadAsMap() {

final String key1 = "xread-stream1";
final String key2 = "xread-stream2";
final String stream1 = "xread-stream1";
final String stream2 = "xread-stream2";

Map<String, StreamEntryID> streamQeury1 = singletonMap(key1, new StreamEntryID());
Map<String, StreamEntryID> streamQeury1 = singletonMap(stream1, new StreamEntryID());

// Before creating Stream
assertNull(jedis.xreadAsMap(XReadParams.xReadParams().block(1), streamQeury1));
Expand All @@ -326,14 +326,14 @@ public void xreadAsMap() {
StreamEntryID id2 = new StreamEntryID(2);
StreamEntryID id3 = new StreamEntryID(3);

assertEquals(id1, jedis.xadd(key1, id1, map));
assertEquals(id2, jedis.xadd(key2, id2, map));
assertEquals(id3, jedis.xadd(key1, id3, map));
assertEquals(id1, jedis.xadd(stream1, id1, map));
assertEquals(id2, jedis.xadd(stream2, id2, map));
assertEquals(id3, jedis.xadd(stream1, id3, map));

// Read only a single Stream
Map<String, List<StreamEntry>> streams1 = jedis.xreadAsMap(XReadParams.xReadParams().count(2), streamQeury1);
assertEquals(singleton(key1), streams1.keySet());
List<StreamEntry> list1 = streams1.get(key1);
assertEquals(singleton(stream1), streams1.keySet());
List<StreamEntry> list1 = streams1.get(stream1);
assertEquals(2, list1.size());
assertEquals(id1, list1.get(0).getID());
assertEquals(map, list1.get(0).getFields());
Expand All @@ -342,20 +342,20 @@ public void xreadAsMap() {

// Read from two Streams
Map<String, StreamEntryID> streamQuery2 = new LinkedHashMap<>();
streamQuery2.put(key1, new StreamEntryID());
streamQuery2.put(key2, new StreamEntryID());
streamQuery2.put(stream1, new StreamEntryID());
streamQuery2.put(stream2, new StreamEntryID());
Map<String, List<StreamEntry>> streams2 = jedis.xreadAsMap(XReadParams.xReadParams().count(1), streamQuery2);
assertEquals(2, streams2.size());
assertEquals(id1, streams2.get(key1).get(0).getID());
assertEquals(id2, streams2.get(key2).get(0).getID());
assertEquals(id1, streams2.get(stream1).get(0).getID());
assertEquals(id2, streams2.get(stream2).get(0).getID());

// Read from last entry
Map<String, StreamEntryID> streamQueryLE = singletonMap(key1, StreamEntryID.XREAD_LAST_ENTRY);
Map<String, StreamEntryID> streamQueryLE = singletonMap(stream1, StreamEntryID.XREAD_LAST_ENTRY);
Map<String, List<StreamEntry>> streamsLE = jedis.xreadAsMap(XReadParams.xReadParams().count(1), streamQueryLE);
assertEquals(singleton(key1), streamsLE.keySet());
assertEquals(1, streamsLE.get(key1).size());
assertEquals(id3, streamsLE.get(key1).get(0).getID());
assertEquals(map, streamsLE.get(key1).get(0).getFields());
assertEquals(singleton(stream1), streamsLE.keySet());
assertEquals(1, streamsLE.get(stream1).size());
assertEquals(id3, streamsLE.get(stream1).get(0).getID());
assertEquals(map, streamsLE.get(stream1).get(0).getFields());
}

@Test
Expand Down Expand Up @@ -499,9 +499,8 @@ public void xreadGroupWithParams() {
jedis.xgroupCreate("xreadGroup-stream2", "xreadGroup-group", null, false);

// Read only a single Stream
Map<String, StreamEntryID> streamQeury11 = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Entry<String, List<StreamEntry>>> streams1 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQeury11);
XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQeury1);
assertEquals(1, streams1.size());
assertEquals(1, streams1.get(0).getValue().size());

Expand All @@ -516,10 +515,28 @@ public void xreadGroupWithParams() {
// Read only fresh messages
StreamEntryID id4 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map);
Map<String, StreamEntryID> streamQeuryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Entry<String, List<StreamEntry>>> streams3 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
List<Entry<String, List<StreamEntry>>> streamsFresh = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().count(4).block(100).noAck(), streamQeuryFresh);
assertEquals(1, streams3.size());
assertEquals(id4, streams3.get(0).getValue().get(0).getID());
assertEquals(1, streamsFresh.size());
assertEquals(id4, streamsFresh.get(0).getValue().get(0).getID());
}

@Test
public void xreadGroupAsMap() {

final String stream1 = "xreadGroup-stream1";
Map<String, String> map = singletonMap("f1", "v1");

StreamEntryID id1 = jedis.xadd(stream1, StreamEntryID.NEW_ENTRY, map);
jedis.xgroupCreate(stream1, "xreadGroup-group", null, false);
Map<String, StreamEntryID> streamQeury1 = singletonMap(stream1, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
Map<String, List<StreamEntry>> range = jedis.xreadGroupAsMap("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().noAck(), streamQeury1);
assertEquals(singleton(stream1), range.keySet());
List<StreamEntry> list = range.get(stream1);
assertEquals(1, list.size());
assertEquals(id1, list.get(0).getID());
assertEquals(map, list.get(0).getFields());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,10 @@ public void xrangeExclusive() {
@Test
public void xreadWithParams() {

final String key1 = "xread-stream1";
final String key2 = "xread-stream2";
final String stream1 = "xread-stream1";
final String stream2 = "xread-stream2";

Map<String, StreamEntryID> streamQuery1 = singletonMap(key1, new StreamEntryID());
Map<String, StreamEntryID> streamQuery1 = singletonMap(stream1, new StreamEntryID());

// Before creating Stream
pipe.xread(XReadParams.xReadParams().block(1), streamQuery1);
Expand All @@ -412,25 +412,25 @@ public void xreadWithParams() {
));

Map<String, String> map1 = singletonMap("f1", "v1");
StreamEntryID id1 = jedis.xadd(key1, (StreamEntryID) null, map1);
StreamEntryID id1 = jedis.xadd(stream1, (StreamEntryID) null, map1);

Map<String, String> map2 = singletonMap("f2", "v2");
StreamEntryID id2 = jedis.xadd(key2, (StreamEntryID) null, map2);
StreamEntryID id2 = jedis.xadd(stream2, (StreamEntryID) null, map2);

// Read only a single Stream
Response<List<Entry<String, List<StreamEntry>>>> streams1 =
pipe.xread(XReadParams.xReadParams().count(1).block(1), streamQuery1);

Response<List<Entry<String, List<StreamEntry>>>> streams2 =
pipe.xread(XReadParams.xReadParams().block(1), singletonMap(key1, id1));
pipe.xread(XReadParams.xReadParams().block(1), singletonMap(stream1, id1));

Response<List<Entry<String, List<StreamEntry>>>> streams3 =
pipe.xread(XReadParams.xReadParams(), singletonMap(key1, id1));
pipe.xread(XReadParams.xReadParams(), singletonMap(stream1, id1));

pipe.sync();

assertThat(streams1.get().stream().map(Entry::getKey).collect(Collectors.toList()),
contains(key1));
contains(stream1));

assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream)
.map(StreamEntry::getID).collect(Collectors.toList()), contains(id1));
Expand All @@ -444,16 +444,16 @@ public void xreadWithParams() {

// Read from two Streams
Map<String, StreamEntryID> streamQuery2 = new LinkedHashMap<>();
streamQuery2.put(key1, new StreamEntryID());
streamQuery2.put(key2, new StreamEntryID());
streamQuery2.put(stream1, new StreamEntryID());
streamQuery2.put(stream2, new StreamEntryID());

Response<List<Entry<String, List<StreamEntry>>>> streams4 =
pipe.xread(XReadParams.xReadParams().count(2).block(1), streamQuery2);

pipe.sync();

assertThat(streams4.get().stream().map(Entry::getKey).collect(Collectors.toList()),
contains(key1, key2));
contains(stream1, stream2));

assertThat(streams4.get().stream().map(Entry::getValue).flatMap(List::stream)
.map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2));
Expand Down

0 comments on commit 7e54283

Please sign in to comment.