Skip to content

Commit

Permalink
Add tests and resolve ClassCastException running SparkGraphComputer o…
Browse files Browse the repository at this point in the history
…n HBase

Signed-off-by: sjudeng <sjudeng@users.noreply.github.com>
  • Loading branch information
sjudeng committed Feb 5, 2017
1 parent 0a2a117 commit e4b57f1
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 100 deletions.
16 changes: 14 additions & 2 deletions janusgraph-hadoop-parent/janusgraph-hadoop-2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@
<scope>test</scope>
<optional>true</optional>
</dependency>
<!-- Include janusgraph-hbase-core to resolve Guava StopWatch error in HBase tests.
Can be removed when Guava version is updated in HBase -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>janusgraph-hbase-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
Expand All @@ -50,20 +58,24 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase098.version}</version>
<version>${hbase100.version}</version>
<optional>true</optional>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
<exclusion>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase098.version}</version>
<version>${hbase100.version}</version>
<optional>true</optional>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.janusgraph.hadoop.formats.util.AbstractBinaryInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
Expand All @@ -46,7 +48,7 @@ public class HBaseBinaryInputFormat extends AbstractBinaryInputFormat {
private static final Logger log = LoggerFactory.getLogger(HBaseBinaryInputFormat.class);

private final TableInputFormat tableInputFormat = new TableInputFormat();
private TableRecordReader tableReader;
private RecordReader<ImmutableBytesWritable, Result> tableReader;
private byte[] inputCFBytes;
private RecordReader<StaticBuffer, Iterable<Entry>> janusgraphRecordReader;

Expand All @@ -57,8 +59,7 @@ public List<InputSplit> getSplits(final JobContext jobContext) throws IOExceptio

@Override
public RecordReader<StaticBuffer, Iterable<Entry>> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
tableReader =
(TableRecordReader) tableInputFormat.createRecordReader(inputSplit, taskAttemptContext);
tableReader = tableInputFormat.createRecordReader(inputSplit, taskAttemptContext);
janusgraphRecordReader =
new HBaseBinaryRecordReader(tableReader, inputCFBytes);
return janusgraphRecordReader;
Expand Down Expand Up @@ -104,7 +105,7 @@ public void setConf(final Configuration config) {
this.tableInputFormat.setConf(config);
}

public TableRecordReader getTableReader() {
public RecordReader<ImmutableBytesWritable, Result> getTableReader() {
return tableReader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package org.janusgraph.hadoop.formats.hbase;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.util.StaticArrayBuffer;
import org.janusgraph.diskstorage.util.StaticArrayEntry;
import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand All @@ -31,11 +32,11 @@

public class HBaseBinaryRecordReader extends RecordReader<StaticBuffer, Iterable<Entry>> {

private TableRecordReader reader;
private RecordReader<ImmutableBytesWritable, Result> reader;

private final byte[] edgestoreFamilyBytes;

public HBaseBinaryRecordReader(final TableRecordReader reader, final byte[] edgestoreFamilyBytes) {
public HBaseBinaryRecordReader(final RecordReader<ImmutableBytesWritable, Result> reader, final byte[] edgestoreFamilyBytes) {
this.reader = reader;
this.edgestoreFamilyBytes = edgestoreFamilyBytes;
}
Expand Down Expand Up @@ -66,7 +67,7 @@ public void close() throws IOException {
}

@Override
public float getProgress() {
public float getProgress() throws IOException, InterruptedException {
return this.reader.getProgress();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2017 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.hadoop;

import org.janusgraph.core.Cardinality;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.diskstorage.configuration.WriteConfiguration;
import org.janusgraph.example.GraphOfTheGodsFactory;
import org.janusgraph.graphdb.JanusGraphBaseTest;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public abstract class AbstractInputFormatIT extends JanusGraphBaseTest {

@Test
public void testReadGraphOfTheGods() throws Exception {
GraphOfTheGodsFactory.load(graph, null, true);
assertEquals(12L, (long) graph.traversal().V().count().next());
Graph g = getGraph();
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
assertEquals(12L, (long) t.V().count().next());
}

@Test
public void testReadWideVertexWithManyProperties() throws Exception {
int numProps = 1 << 16;

long numV = 1;
mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make();
mgmt.commit();
finishSchema();

for (int j = 0; j < numV; j++) {
Vertex v = graph.addVertex();
for (int i = 0; i < numProps; i++) {
v.property("p", i);
}
}
graph.tx().commit();

assertEquals(numV, (long) graph.traversal().V().count().next());
Map<String, Object> propertiesOnVertex = graph.traversal().V().valueMap().next();
List<?> valuesOnP = (List)propertiesOnVertex.values().iterator().next();
assertEquals(numProps, valuesOnP.size());
Graph g = getGraph();
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
assertEquals(numV, (long) t.V().count().next());
propertiesOnVertex = t.V().valueMap().next();
valuesOnP = (List)propertiesOnVertex.values().iterator().next();
assertEquals(numProps, valuesOnP.size());
}

@Test
public void testReadSelfEdge() throws Exception {
GraphOfTheGodsFactory.load(graph, null, true);
assertEquals(12L, (long) graph.traversal().V().count().next());

// Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no schema changes
JanusGraphVertex sky = (JanusGraphVertex)graph.query().has("name", "sky").vertices().iterator().next();
assertNotNull(sky);
assertEquals("sky", sky.value("name"));
assertEquals(1L, sky.query().direction(Direction.IN).edgeCount());
assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount());
assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount());
sky.addEdge("lives", sky, "reason", "testReadSelfEdge");
assertEquals(2L, sky.query().direction(Direction.IN).edgeCount());
assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount());
assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount());
graph.tx().commit();

// Read the new edge using the inputformat
Graph g = getGraph();
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
Iterator<Object> edgeIdIter = t.V().has("name", "sky").bothE().id();
assertNotNull(edgeIdIter);
assertTrue(edgeIdIter.hasNext());
Set<Object> edges = Sets.newHashSet(edgeIdIter);
assertEquals(2, edges.size());
}

abstract protected Graph getGraph() throws IOException, ConfigurationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,102 +14,28 @@

package org.janusgraph.hadoop;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.janusgraph.CassandraStorageSetup;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.diskstorage.configuration.ModifiableConfiguration;
import org.janusgraph.diskstorage.configuration.WriteConfiguration;
import org.janusgraph.example.GraphOfTheGodsFactory;
import org.janusgraph.graphdb.JanusGraphBaseTest;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.Test;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
public class CassandraInputFormatIT extends AbstractInputFormatIT {

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public class CassandraInputFormatIT extends JanusGraphBaseTest {


@Test
public void testReadGraphOfTheGods() {
GraphOfTheGodsFactory.load(graph, null, true);
assertEquals(12L, (long) graph.traversal().V().count().next());
Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties");
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
assertEquals(12L, (long) t.V().count().next());
}

@Test
public void testReadWideVertexWithManyProperties() {
int numProps = 1 << 16;

long numV = 1;
mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make();
mgmt.commit();
finishSchema();

for (int j = 0; j < numV; j++) {
Vertex v = graph.addVertex();
for (int i = 0; i < numProps; i++) {
v.property("p", i);
}
}
graph.tx().commit();

assertEquals(numV, (long) graph.traversal().V().count().next());
Map<String, Object> propertiesOnVertex = graph.traversal().V().valueMap().next();
List<?> valuesOnP = (List)propertiesOnVertex.values().iterator().next();
assertEquals(numProps, valuesOnP.size());
Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties");
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
assertEquals(numV, (long) t.V().count().next());
propertiesOnVertex = t.V().valueMap().next();
valuesOnP = (List)propertiesOnVertex.values().iterator().next();
assertEquals(numProps, valuesOnP.size());
}

@Test
public void testReadSelfEdge() {
GraphOfTheGodsFactory.load(graph, null, true);
assertEquals(12L, (long) graph.traversal().V().count().next());

// Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no schema changes
JanusGraphVertex sky = (JanusGraphVertex)graph.query().has("name", "sky").vertices().iterator().next();
assertNotNull(sky);
assertEquals("sky", sky.value("name"));
assertEquals(1L, sky.query().direction(Direction.IN).edgeCount());
assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount());
assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount());
sky.addEdge("lives", sky, "reason", "testReadSelfEdge");
assertEquals(2L, sky.query().direction(Direction.IN).edgeCount());
assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount());
assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount());
graph.tx().commit();

// Read the new edge using the inputformat
Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties");
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
Iterator<Object> edgeIdIter = t.V().has("name", "sky").bothE().id();
assertNotNull(edgeIdIter);
assertTrue(edgeIdIter.hasNext());
Set<Object> edges = Sets.newHashSet(edgeIdIter);
assertEquals(2, edges.size());
protected Graph getGraph() throws ConfigurationException, IOException {
final PropertiesConfiguration config = new PropertiesConfiguration("target/test-classes/cassandra-read.properties");
Path baseOutDir = Paths.get((String) config.getProperty("gremlin.hadoop.outputLocation"));
baseOutDir.toFile().mkdirs();
String outDir = Files.createTempDirectory(baseOutDir, null).toAbsolutePath().toString();
config.setProperty("gremlin.hadoop.outputLocation", outDir);
return GraphFactory.open(config);
}

@Override
Expand Down
Loading

0 comments on commit e4b57f1

Please sign in to comment.