Skip to content

Commit

Permalink
Fix PENDING entries of xinfoStreamFull method (#2988)
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 authored Apr 29, 2022
1 parent c63a9b1 commit 76764b9
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 8 deletions.
4 changes: 2 additions & 2 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,7 @@ private Map<String, Builder> createDecoderMap() {
tempMappingFunctions.put(StreamConsumerFullInfo.NAME, STRING);
tempMappingFunctions.put(StreamConsumerFullInfo.SEEN_TIME, LONG);
tempMappingFunctions.put(StreamConsumerFullInfo.PEL_COUNT, LONG);
tempMappingFunctions.put(StreamConsumerFullInfo.PENDING, LONG_LIST);
tempMappingFunctions.put(StreamConsumerFullInfo.PENDING, ENCODED_OBJECT_LIST);

return tempMappingFunctions;
}
Expand Down Expand Up @@ -1354,7 +1354,7 @@ private Map<String, Builder> createDecoderMap() {
Map<String, Builder> tempMappingFunctions = new HashMap<>();
tempMappingFunctions.put(StreamGroupFullInfo.NAME, STRING);
tempMappingFunctions.put(StreamGroupFullInfo.CONSUMERS, STREAM_CONSUMER_FULL_INFO_LIST);
tempMappingFunctions.put(StreamGroupFullInfo.PENDING, STRING_LIST);
tempMappingFunctions.put(StreamGroupFullInfo.PENDING, ENCODED_OBJECT_LIST);
tempMappingFunctions.put(StreamGroupFullInfo.LAST_DELIVERED, STREAM_ENTRY_ID);
tempMappingFunctions.put(StreamGroupFullInfo.PEL_COUNT, LONG);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import redis.clients.jedis.StreamEntryID;

/**
* This class holds information about a stream consumer with command <code>xinfo stream mystream full<code/>.
Expand All @@ -19,16 +20,18 @@ public class StreamConsumerFullInfo implements Serializable {
private final String name;
private final Long seenTime;
private final Long pelCount;
private final List<Long> pending;
private final List<List<Object>> pending;
private final Map<String, Object> consumerInfo;

@SuppressWarnings("unchecked")
public StreamConsumerFullInfo(Map<String, Object> map) {
consumerInfo = map;
name = (String) map.get(NAME);
seenTime = (Long) map.get(SEEN_TIME);
pending = (List<Long>) map.get(PENDING);
pending = (List<List<Object>>) map.get(PENDING);
pelCount = (Long) map.get(PEL_COUNT);

pending.stream().forEach(entry -> entry.set(0, new StreamEntryID((String) entry.get(0))));
}

public String getName() {
Expand All @@ -43,7 +46,7 @@ public Long getPelCount() {
return pelCount;
}

public List<Long> getPending() {
public List<List<Object>> getPending() {
return pending;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class StreamGroupFullInfo implements Serializable {

private final String name;
private final List<StreamConsumerFullInfo> consumers;
private final List<String> pending;
private final List<List<Object>> pending;
private final Long pelCount;
private final StreamEntryID lastDeliveredId;
private final Map<String, Object> groupFullInfo;
Expand All @@ -35,10 +35,11 @@ public StreamGroupFullInfo(Map<String, Object> map) {
groupFullInfo = map;
name = (String) map.get(NAME);
consumers = (List<StreamConsumerFullInfo>) map.get(CONSUMERS);
pending = (List<String>) map.get(PENDING);
pending = (List<List<Object>>) map.get(PENDING);
lastDeliveredId = (StreamEntryID) map.get(LAST_DELIVERED);
pelCount = (Long) map.get(PEL_COUNT);

pending.stream().forEach(entry -> entry.set(0, new StreamEntryID((String) entry.get(0))));
}

public String getName() {
Expand All @@ -49,7 +50,7 @@ public List<StreamConsumerFullInfo> getConsumers() {
return consumers;
}

public List<String> getPending() {
public List<List<Object>> getPending() {
return pending;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,38 @@ public void xinfo() throws InterruptedException {
} catch (JedisException e) {
assertEquals("ERR no such key", e.getMessage());
}
}

@Test
public void xinfoStreamFullWithPending() {

Map<String, String> map = singletonMap("f1", "v1");
StreamEntryID id1 = jedis.xadd("streamfull2", (StreamEntryID) null, map);
StreamEntryID id2 = jedis.xadd("streamfull2", (StreamEntryID) null, map);
jedis.xgroupCreate("streamfull2", "xreadGroup-group", null, false);

Map<String, StreamEntryID> streamQeury1 = singletonMap("streamfull2", StreamEntryID.UNRECEIVED_ENTRY);
List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().count(1), streamQeury1);
assertEquals(1, range.size());
assertEquals(1, range.get(0).getValue().size());

StreamFullInfo full = jedis.xinfoStreamFull("streamfull2");
assertEquals(1, full.getGroups().size());
StreamGroupFullInfo group = full.getGroups().get(0);
assertEquals("xreadGroup-group", group.getName());

assertEquals(1, group.getPending().size());
List<Object> groupPendingEntry = group.getPending().get(0);
assertEquals(id1, groupPendingEntry.get(0));
assertEquals("xreadGroup-consumer", groupPendingEntry.get(1));

assertEquals(1, group.getConsumers().size());
StreamConsumerFullInfo consumer = group.getConsumers().get(0);
assertEquals("xreadGroup-consumer", consumer.getName());
assertEquals(1, consumer.getPending().size());
List<Object> consumerPendingEntry = consumer.getPending().get(0);
assertEquals(id1, consumerPendingEntry.get(0));
}

@Test
Expand Down

0 comments on commit 76764b9

Please sign in to comment.