Skip to content

Commit

Permalink
Fixed issue in FRAG4 with incorrect size() (https://issues.redhat.com…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Sep 1, 2020
1 parent 74bc8f8 commit 45c86c3
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 47 deletions.
16 changes: 16 additions & 0 deletions src/org/jgroups/FragmentedMessage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jgroups;

import org.jgroups.util.ByteArrayDataOutputStream;
import org.jgroups.util.PartialOutputStream;

import java.io.DataInput;
Expand Down Expand Up @@ -37,9 +38,24 @@ public FragmentedMessage(Message original_msg, int off, int len) {
protected int sizeOfPayload() {return Global.INT_SIZE + length;}

public void writePayload(DataOutput out) throws IOException {
ByteArrayDataOutputStream bos=out instanceof ByteArrayDataOutputStream? (ByteArrayDataOutputStream)out : null;
int size_pos=bos != null? bos.position() : -1;
out.writeInt(length);
PartialOutputStream pos=new PartialOutputStream(out, offset, length);

int prev_pos=bos != null? bos.position() : -1;
original_msg.writeTo(pos);
int last_pos=bos != null? bos.position() : -1;
int written=last_pos-prev_pos;

// if we have a ByteArrayDataOutputStream *and* the number of bytes written doesn't correspond with the length,
// then fix the length in the output stream (https://issues.redhat.com/browse/JGRP-2289)
if(bos != null && written != length) {
int current_pos=bos.position();
bos.position(size_pos);
bos.writeInt(written);
bos.position(current_pos);
}
}

public void readPayload(DataInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public IncorrectSizeObject(int size) {
}

@Override public int serializedSize() {
return buf.length + 200 + Global.INT_SIZE; // incorrect, should be 1000
return buf.length + 200 + Global.INT_SIZE; // incorrect, should be buf.length + 4
}

@Override
Expand All @@ -127,5 +127,10 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
buf=new byte[in.readInt()];
in.readFully(buf);
}

@Override
public String toString() {
return String.format("%d bytes", buf != null? buf.length : 0);
}
}
}
82 changes: 36 additions & 46 deletions tests/junit-functional/org/jgroups/tests/ObjectMessageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ObjectMessage;
import org.jgroups.util.*;
import org.jgroups.protocols.FRAG4;
import org.jgroups.protocols.Fragmentation;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.MyReceiver;
import org.jgroups.util.Util;
import org.testng.annotations.Test;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

Expand Down Expand Up @@ -137,10 +138,11 @@ public void testSize() {

// https://issues.redhat.com/browse/JGRP-2285
public void testIncorrectSize() throws Exception {
try(JChannel a=create("A", "test-size");
JChannel b=create("B", "test-size");) {
try(JChannel a=create("A");
JChannel b=create("B");) {
a.connect("test-size");
b.connect("test-size");
Util.waitUntilAllChannelsHaveSameView(10000, 500, a,b);

IncorrectSizeObject obj=new IncorrectSizeObject(1000);
MyReceiver<IncorrectSizeObject> r=new MyReceiver<>();
b.setReceiver(r);
Expand All @@ -152,49 +154,37 @@ public void testIncorrectSize() throws Exception {
}
}

protected static JChannel create(String name, String cluster) throws Exception {
return new JChannel(Util.getTestStack()).setName(name).connect(cluster);
}


protected static class BasePerson implements Streamable {
protected int age;
protected String name;

public BasePerson() {
}

public BasePerson(int age, String name) {
this.age=age;
this.name=name;
}

public void writeTo(DataOutput out) throws IOException {
out.writeInt(age);
Bits.writeString(name, out);
}

public void readFrom(DataInput in) throws IOException {
age=in.readInt();
name=Bits.readString(in);
}

public String toString() {
return String.format("name=%s, age=%d", name, age);
// https://issues.redhat.com/browse/JGRP-2289
public void testIncorrectSizeWithFRAG4() throws Exception {
try(JChannel a=create("A");
JChannel b=create("B")) {
setFRAG4(500, a,b); // replaces any existing fragmentation protocol with FRAG4
a.connect("test-size");
b.connect("test-size");
Util.waitUntilAllChannelsHaveSameView(10000, 500, a,b);
IncorrectSizeObject obj=new IncorrectSizeObject(1000);
MyReceiver<IncorrectSizeObject> r=new MyReceiver<>();
b.setReceiver(r);
Message msg=new ObjectMessage(b.getAddress(), obj);
a.send(msg);
Util.waitUntil(10000, 250, () -> r.size() > 0);
IncorrectSizeObject obj2=r.list().get(0);
assert obj2 != null && obj2.buf.length == 1000;
}
}

protected static class Person extends BasePerson implements SizeStreamable {

public Person() {
}

public Person(int age, String name) {
super(age, name);
}
protected static JChannel create(String name) throws Exception {
return new JChannel(Util.getTestStack()).setName(name);
}

public int serializedSize() {
return Global.INT_SIZE + Bits.size(name);
// removes any existing fragmentation protocols (if any) with FRAG4
protected static void setFRAG4(int frag_size, JChannel... channels) throws Exception {
for(JChannel c: channels) {
ProtocolStack stack=c.getProtocolStack();
stack.removeProtocols(Fragmentation.class);
FRAG4 f=new FRAG4().setFragSize(frag_size);
stack.insertProtocolAtTop(f);
f.init();
}
}

Expand Down

0 comments on commit 45c86c3

Please sign in to comment.