Skip to content

Commit

Permalink
Improve handling of UDA and UDF metadata
Browse files Browse the repository at this point in the history
patch by Aleksey Yeschenko; reviewed by Robert Stupp for CASSANDRA-9665
  • Loading branch information
iamaleksey committed Jun 30, 2015
1 parent 67db844 commit 3566843
Show file tree
Hide file tree
Showing 26 changed files with 621 additions and 423 deletions.
5 changes: 2 additions & 3 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
3.0:
* Improve log output from unit tests (CASSANDRA-9528)
3.0
* Add algorithmic token allocation (CASSANDRA-7032)
* Add nodetool command to replay batchlog (CASSANDRA-9547)
* Make file buffer cache independent of paths being read (CASSANDRA-8897)
Expand Down Expand Up @@ -39,7 +38,7 @@ Merged from 2.1:
* Fix bug in cardinality check when compacting (CASSANDRA-9580)
* Fix memory leak in Ref due to ConcurrentLinkedQueue.remove() behaviour (CASSANDRA-9549)
* Make rebuild only run one at a time (CASSANDRA-9119)
Merged from 2.0
Merged from 2.0:
* Improve trace messages for RR (CASSANDRA-9479)
* Fix suboptimal secondary index selection when restricted
clustering column is also indexed (CASSANDRA-9631)
Expand Down
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/auth/FunctionResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import com.google.common.base.Joiner;
Expand All @@ -31,7 +32,6 @@
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.cql3.functions.FunctionName;
import org.apache.cassandra.cql3.functions.Functions;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.TypeParser;

Expand Down Expand Up @@ -244,7 +244,7 @@ public boolean exists()
case KEYSPACE:
return Schema.instance.getKeyspaces().contains(keyspace);
case FUNCTION:
return Functions.find(getFunctionName(), argTypes) != null;
return Schema.instance.findFunction(getFunctionName(), argTypes).isPresent();
}
throw new AssertionError();
}
Expand All @@ -258,9 +258,9 @@ public Set<Permission> applicablePermissions()
return COLLECTION_LEVEL_PERMISSIONS;
case FUNCTION:
{
Function function = Functions.find(getFunctionName(), argTypes);
assert function != null : "Unable to find function object for resource " + toString();
return function.isAggregate() ? AGGREGATE_FUNCTION_PERMISSIONS : SCALAR_FUNCTION_PERMISSIONS;
Optional<Function> function = Schema.instance.findFunction(getFunctionName(), argTypes);
assert function.isPresent() : "Unable to find function object for resource " + toString();
return function.get().isAggregate() ? AGGREGATE_FUNCTION_PERMISSIONS : SCALAR_FUNCTION_PERMISSIONS;
}
}
throw new AssertionError();
Expand Down
43 changes: 31 additions & 12 deletions src/java/org/apache/cassandra/config/KSMetaData.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.schema.Functions;
import org.apache.cassandra.service.StorageService;

public final class KSMetaData
Expand All @@ -34,13 +35,14 @@ public final class KSMetaData
public final boolean durableWrites;

public final UTMetaData userTypes;
public final Functions functions;

public KSMetaData(String name,
Class<? extends AbstractReplicationStrategy> strategyClass,
Map<String, String> strategyOptions,
boolean durableWrites)
{
this(name, strategyClass, strategyOptions, durableWrites, Collections.<CFMetaData>emptyList(), new UTMetaData());
this(name, strategyClass, strategyOptions, durableWrites, Collections.<CFMetaData>emptyList(), new UTMetaData(), Functions.none());
}

public KSMetaData(String name,
Expand All @@ -49,15 +51,26 @@ public KSMetaData(String name,
boolean durableWrites,
Iterable<CFMetaData> cfDefs)
{
this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData());
this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData(), Functions.none());
}

public KSMetaData(String name,
Class<? extends AbstractReplicationStrategy> strategyClass,
Map<String, String> strategyOptions,
boolean durableWrites,
Iterable<CFMetaData> cfDefs,
Functions functions)
{
this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData(), functions);
}

private KSMetaData(String name,
Class<? extends AbstractReplicationStrategy> strategyClass,
Map<String, String> strategyOptions,
boolean durableWrites,
Iterable<CFMetaData> cfDefs,
UTMetaData userTypes)
UTMetaData userTypes,
Functions functions)
{
this.name = name;
this.strategyClass = strategyClass == null ? NetworkTopologyStrategy.class : strategyClass;
Expand All @@ -68,6 +81,7 @@ private KSMetaData(String name,
this.cfMetaData = Collections.unmodifiableMap(cfmap);
this.durableWrites = durableWrites;
this.userTypes = userTypes;
this.functions = functions;
}

// For new user created keyspaces (through CQL)
Expand All @@ -82,7 +96,7 @@ public static KSMetaData newKeyspace(String name, String strategyName, Map<Strin

public static KSMetaData newKeyspace(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> options, boolean durablesWrites, Iterable<CFMetaData> cfDefs)
{
return new KSMetaData(name, strategyClass, options, durablesWrites, cfDefs, new UTMetaData());
return new KSMetaData(name, strategyClass, options, durablesWrites, cfDefs, new UTMetaData(), Functions.none());
}

public KSMetaData cloneWithTableRemoved(CFMetaData table)
Expand All @@ -91,7 +105,7 @@ public KSMetaData cloneWithTableRemoved(CFMetaData table)
List<CFMetaData> newTables = new ArrayList<>(cfMetaData().values());
newTables.remove(table);
assert newTables.size() == cfMetaData().size() - 1;
return cloneWith(newTables, userTypes);
return cloneWith(newTables, userTypes, functions);
}

public KSMetaData cloneWithTableAdded(CFMetaData table)
Expand All @@ -100,12 +114,17 @@ public KSMetaData cloneWithTableAdded(CFMetaData table)
List<CFMetaData> newTables = new ArrayList<>(cfMetaData().values());
newTables.add(table);
assert newTables.size() == cfMetaData().size() + 1;
return cloneWith(newTables, userTypes);
return cloneWith(newTables, userTypes, functions);
}

public KSMetaData cloneWith(Iterable<CFMetaData> tables, UTMetaData types)
public KSMetaData cloneWith(Iterable<CFMetaData> tables, UTMetaData types, Functions functions)
{
return new KSMetaData(name, strategyClass, strategyOptions, durableWrites, tables, types);
return new KSMetaData(name, strategyClass, strategyOptions, durableWrites, tables, types, functions);
}

public KSMetaData cloneWith(Functions functions)
{
return new KSMetaData(name, strategyClass, strategyOptions, durableWrites, cfMetaData.values(), userTypes, functions);
}

public static KSMetaData testMetadata(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> strategyOptions, CFMetaData... cfDefs)
Expand All @@ -121,7 +140,7 @@ public static KSMetaData testMetadataNotDurable(String name, Class<? extends Abs
@Override
public int hashCode()
{
return Objects.hashCode(name, strategyClass, strategyOptions, cfMetaData, durableWrites, userTypes);
return Objects.hashCode(name, strategyClass, strategyOptions, cfMetaData, durableWrites, functions, userTypes);
}

@Override
Expand All @@ -140,6 +159,7 @@ public boolean equals(Object o)
&& Objects.equal(strategyOptions, other.strategyOptions)
&& Objects.equal(cfMetaData, other.cfMetaData)
&& Objects.equal(durableWrites, other.durableWrites)
&& Objects.equal(functions, other.functions)
&& Objects.equal(userTypes, other.userTypes);
}

Expand All @@ -157,6 +177,7 @@ public String toString()
.add("strategyOptions", strategyOptions)
.add("cfMetaData", cfMetaData)
.add("durableWrites", durableWrites)
.add("functions", functions)
.add("userTypes", userTypes)
.toString();
}
Expand All @@ -176,9 +197,7 @@ public KSMetaData validate() throws ConfigurationException
IEndpointSnitch eps = DatabaseDescriptor.getEndpointSnitch();
AbstractReplicationStrategy.validateReplicationStrategy(name, strategyClass, tmd, eps, strategyOptions);

for (CFMetaData cfm : cfMetaData.values())
cfm.validate();

cfMetaData.values().forEach(CFMetaData::validate);
return this;
}
}
117 changes: 80 additions & 37 deletions src/java/org/apache/cassandra/config/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.cql3.functions.Functions;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.cql3.functions.FunctionName;
import org.apache.cassandra.cql3.functions.UDAggregate;
import org.apache.cassandra.cql3.functions.UDFunction;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.schema.LegacySchemaTables;
Expand Down Expand Up @@ -282,14 +284,6 @@ public Set<String> getKeyspaces()
return keyspaces.keySet();
}

/**
* @return collection of the metadata about all keyspaces registered in the system (system and non-system)
*/
public Collection<KSMetaData> getKeyspaceDefinitions()
{
return keyspaces.values();
}

/**
* Update (or insert) new keyspace definition
*
Expand Down Expand Up @@ -362,6 +356,45 @@ public void purge(CFMetaData cfm)
cfm.markPurged();
}

/* Function helpers */

/**
* Get all function overloads with the specified name
*
* @param name fully qualified function name
* @return an empty list if the keyspace or the function name are not found;
* a non-empty collection of {@link Function} otherwise
*/
public Collection<Function> getFunctions(FunctionName name)
{
if (!name.hasKeyspace())
throw new IllegalArgumentException(String.format("Function name must be fully quallified: got %s", name));

KSMetaData ksm = getKSMetaData(name.keyspace);
return ksm == null
? Collections.emptyList()
: ksm.functions.get(name);
}

/**
* Find the function with the specified name
*
* @param name fully qualified function name
* @param argTypes function argument types
* @return an empty {@link Optional} if the keyspace or the function name are not found;
* a non-empty optional of {@link Function} otherwise
*/
public Optional<Function> findFunction(FunctionName name, List<AbstractType<?>> argTypes)
{
if (!name.hasKeyspace())
throw new IllegalArgumentException(String.format("Function name must be fully quallified: got %s", name));

KSMetaData ksm = getKSMetaData(name.keyspace);
return ksm == null
? Optional.empty()
: ksm.functions.find(name, argTypes);
}

/* Version control */

/**
Expand Down Expand Up @@ -420,7 +453,7 @@ public void updateKeyspace(String ksName)
{
KSMetaData oldKsm = getKSMetaData(ksName);
assert oldKsm != null;
KSMetaData newKsm = LegacySchemaTables.createKeyspaceFromName(ksName).cloneWith(oldKsm.cfMetaData().values(), oldKsm.userTypes);
KSMetaData newKsm = LegacySchemaTables.createKeyspaceFromName(ksName).cloneWith(oldKsm.cfMetaData().values(), oldKsm.userTypes, oldKsm.functions);

setKeyspaceDefinition(newKsm);

Expand Down Expand Up @@ -552,57 +585,67 @@ public void dropType(UserType ut)

public void addFunction(UDFunction udf)
{
logger.info("Loading {}", udf);

Functions.addOrReplaceFunction(udf);

addFunctionInternal(udf);
MigrationManager.instance.notifyCreateFunction(udf);
}

public void updateFunction(UDFunction udf)
{
logger.info("Updating {}", udf);

Functions.addOrReplaceFunction(udf);

updateFunctionInternal(udf);
MigrationManager.instance.notifyUpdateFunction(udf);
}

public void dropFunction(UDFunction udf)
{
logger.info("Drop {}", udf);

// TODO: this is kind of broken as this remove all overloads of the function name
Functions.removeFunction(udf.name(), udf.argTypes());

dropFunctionInternal(udf);
MigrationManager.instance.notifyDropFunction(udf);
}

public void addAggregate(UDAggregate udf)
public void addAggregate(UDAggregate uda)
{
logger.info("Loading {}", udf);

Functions.addOrReplaceFunction(udf);
addFunctionInternal(uda);
MigrationManager.instance.notifyCreateAggregate(uda);
}

MigrationManager.instance.notifyCreateAggregate(udf);
public void updateAggregate(UDAggregate uda)
{
updateFunctionInternal(uda);
MigrationManager.instance.notifyUpdateAggregate(uda);
}

public void updateAggregate(UDAggregate udf)
public void dropAggregate(UDAggregate uda)
{
logger.info("Updating {}", udf);
dropFunctionInternal(uda);
MigrationManager.instance.notifyDropAggregate(uda);
}

Functions.addOrReplaceFunction(udf);
private void addFunctionInternal(Function fun)
{
assert fun instanceof UDFunction || fun instanceof UDAggregate;

MigrationManager.instance.notifyUpdateAggregate(udf);
KSMetaData oldKsm = getKSMetaData(fun.name().keyspace);
assert oldKsm != null;
KSMetaData newKsm = oldKsm.cloneWith(oldKsm.functions.with(fun));
setKeyspaceDefinition(newKsm);
}

public void dropAggregate(UDAggregate udf)
private void updateFunctionInternal(Function fun)
{
logger.info("Drop {}", udf);
assert fun instanceof UDFunction || fun instanceof UDAggregate;

KSMetaData oldKsm = getKSMetaData(fun.name().keyspace);
assert oldKsm != null;
KSMetaData newKsm = oldKsm.cloneWith(oldKsm.functions.without(fun.name(), fun.argTypes()).with(fun));
setKeyspaceDefinition(newKsm);
}

// TODO: this is kind of broken as this remove all overloads of the function name
Functions.removeFunction(udf.name(), udf.argTypes());
private void dropFunctionInternal(Function fun)
{
assert fun instanceof UDFunction || fun instanceof UDAggregate;

MigrationManager.instance.notifyDropAggregate(udf);
KSMetaData oldKsm = getKSMetaData(fun.name().keyspace);
assert oldKsm != null;
KSMetaData newKsm = oldKsm.cloneWith(oldKsm.functions.without(fun.name(), fun.argTypes()));
setKeyspaceDefinition(newKsm);
}
}
Loading

0 comments on commit 3566843

Please sign in to comment.