diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 5b10e0c9e26b..f348ff94de6d 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -245,7 +245,12 @@ org.apache.nifi nifi-mongodb-nar nar - + + + org.apache.nifi + nifi-mongodb-services-nar + nar + org.apache.nifi nifi-solr-nar diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-client-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-client-service-api/pom.xml new file mode 100644 index 000000000000..fce95ac36d64 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-client-service-api/pom.xml @@ -0,0 +1,43 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-standard-services + 1.4.0-SNAPSHOT + + + nifi-mongodb-client-service-api + jar + + + + org.apache.nifi + nifi-api + provided + + + org.apache.nifi + nifi-utils + + + org.mongodb + mongo-java-driver + + + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java new file mode 100644 index 000000000000..b0f161811008 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mongodb; + +import org.apache.nifi.controller.ControllerService; +import org.bson.Document; + +import java.util.List; + +public interface MongoDBClientService extends ControllerService { + default Document convertJson(String query) { + return Document.parse(query); + } + + long count(Document query); + void delete(Document query); + boolean exists(Document query); + Document findOne(Document query); + List findMany(Document query); + List findMany(Document query, int limit); + List findMany(Document query, Document sort, int limit); + void insert(Document doc); + void insert(List docs); + void update(Document query, Document update); + void update(Document query, Document update, boolean multiple); + void updateOne(Document query, Document update); + void upsert(Document query, Document update); + void dropDatabase(); + void dropCollection(); +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services-nar/pom.xml new file mode 100644 index 000000000000..290ce1ec31c9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services-nar/pom.xml @@ -0,0 +1,40 @@ + + + 4.0.0 + + org.apache.nifi + nifi-mongodb-services-bundle + 1.4.0-SNAPSHOT + + nifi-mongodb-services-nar + nar + + + org.apache.nifi + nifi-standard-services-api-nar + nar + + + org.apache.nifi + nifi-lookup-services + + + org.apache.nifi + nifi-mongodb-services + 1.4.0-SNAPSHOT + + + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 000000000000..41ac7bb55bc0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,131 @@ +nifi-mongodb-services-nar +Copyright 2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Configuration + The following NOTICE information applies: + Apache Commons Configuration + Copyright 2001-2017 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Apache Commons CSV + The following NOTICE information applies: + Apache Commons CSV + Copyright 2005-2016 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Apache Commons BeanUtils + The following NOTICE information applies: + Apache Commons BeanUtils + Copyright 2000-2016 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache HttpComponents + The following NOTICE information applies: + Apache HttpClient + Copyright 1999-2014 The Apache Software Foundation + + Apache HttpCore + Copyright 2005-2014 The Apache Software Foundation + + This project contains annotations derived from JCIP-ANNOTATIONS + Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2014 The Apache Software Foundation + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + + (ASLv2) Apache Commons Logging + The following NOTICE information applies: + Apache Commons Logging + Copyright 2003-2013 The Apache Software Foundation + + (ASLv2) Apache Commons Net + The following NOTICE information applies: + Apache Commons Net + Copyright 2001-2016 The Apache Software Foundation + + (ASLv2) Apache Commons Collections + The following NOTICE information applies: + Apache Commons Collections + Copyright 2001-2016 The Apache Software Foundation + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2016 The Apache Software Foundation + + (ASLv2) GeoIP2 Java API + The following NOTICE information applies: + GeoIP2 Java API + This software is Copyright (c) 2013 by MaxMind, Inc. + +  (ASLv2) Google HTTP Client Library for Java +    The following NOTICE information applies: +      Copyright 2011 Google Inc. + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + +************************ +Creative Commons Attribution-ShareAlike 3.0 +************************ + +The following binary components are provided under the Creative Commons Attribution-ShareAlike 3.0. See project link for details. + + (CCAS 3.0) MaxMind DB (https://github.com/maxmind/MaxMind-DB) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/pom.xml new file mode 100644 index 000000000000..7f7494aa4fb0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/pom.xml @@ -0,0 +1,89 @@ + + + 4.0.0 + + org.apache.nifi + nifi-mongodb-services-bundle + 1.4.0-SNAPSHOT + + nifi-mongodb-services + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + org.apache.nifi + nifi-lookup-service-api + + + org.apache.nifi + nifi-utils + + + org.apache.nifi + nifi-record + + + org.apache.nifi + nifi-mongodb-client-service-api + ${project.version} + provided + + + org.mongodb + mongo-java-driver + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + org.apache.nifi + nifi-ssl-context-service-api + compile + + + + + + + org.apache.rat + apache-rat-plugin + + + src/test/resources/test.csv + src/test/resources/test.properties + src/test/resources/test.xml + + + + + + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java new file mode 100644 index 000000000000..8ac05b27030d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mongodb; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoClientOptions.Builder; +import com.mongodb.MongoClientURI; +import com.mongodb.WriteConcern; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.authentication.exception.ProviderCreationException; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.util.SslContextFactory; +import org.apache.nifi.ssl.SSLContextService; +import org.bson.Document; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class AbstractMongoDBControllerService extends AbstractControllerService { + static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED"; + static final String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED"; + static final String WRITE_CONCERN_FSYNCED = "FSYNCED"; + static final String WRITE_CONCERN_JOURNALED = "JOURNALED"; + static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED"; + static final String WRITE_CONCERN_MAJORITY = "MAJORITY"; + + protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder() + .name("mongo-uri") + .displayName("Mongo URI") + .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]") + .required(true) + .expressionLanguageSupported(true) + .addValidator(Validation.DOCUMENT_VALIDATOR) + .build(); + protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("mongo-db-name") + .displayName("Mongo Database Name") + .description("The name of the database to use") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() + .name("mongo-collection-name") + .displayName("Mongo Collection Name") + .description("The name of the collection to use") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("ssl-context-service") + .displayName("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL " + + "connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + .name("ssl-client-auth") + .displayName("Client Auth") + .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. " + + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context " + + "has been defined and enabled.") + .required(false) + .allowableValues(SSLContextService.ClientAuth.values()) + .defaultValue("REQUIRED") + .build(); + + public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder() + .name("mongo-write-concern") + .displayName("Write Concern") + .description("The write concern to use") + .required(true) + .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED, + WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY) + .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) + .build(); + + static List descriptors = new ArrayList<>(); + + static { + descriptors.add(URI); + descriptors.add(DATABASE_NAME); + descriptors.add(COLLECTION_NAME); + descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(CLIENT_AUTH); + } + + protected MongoClient mongoClient; + + protected final void createClient(ConfigurationContext context) throws IOException { + if (mongoClient != null) { + closeClient(); + } + + getLogger().info("Creating MongoClient"); + + // Set up the client for secure (SSL/TLS communications) if configured to do so + final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue(); + final SSLContext sslContext; + + if (sslService != null) { + final SSLContextService.ClientAuth clientAuth; + if (StringUtils.isBlank(rawClientAuth)) { + clientAuth = SSLContextService.ClientAuth.REQUIRED; + } else { + try { + clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth); + } catch (final IllegalArgumentException iae) { + throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]", + rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", "))); + } + } + sslContext = sslService.createSSLContext(clientAuth); + } else { + sslContext = null; + } + + try { + if(sslContext == null) { + mongoClient = new MongoClient(new MongoClientURI(getURI(context))); + } else { + mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext))); + } + } catch (Exception e) { + getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e); + throw e; + } + } + + protected Builder getClientOptions(final SSLContext sslContext) { + MongoClientOptions.Builder builder = MongoClientOptions.builder(); + builder.sslEnabled(true); + builder.socketFactory(sslContext.getSocketFactory()); + return builder; + } + + @OnStopped + public final void closeClient() { + if (mongoClient != null) { + mongoClient.close(); + mongoClient = null; + } + } + + protected MongoDatabase getDatabase(final ConfigurationContext context) { + return getDatabase(context, null); + } + + protected MongoDatabase getDatabase(final ConfigurationContext context, final FlowFile flowFile) { + final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + return mongoClient.getDatabase(databaseName); + } + + protected MongoCollection getCollection(final ConfigurationContext context) { + return getCollection(context, null); + } + + protected MongoCollection getCollection(final ConfigurationContext context, final FlowFile flowFile) { + final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue(); + return getDatabase(context, flowFile).getCollection(collectionName); + } + + protected String getURI(final ConfigurationContext context) { + return context.getProperty(URI).evaluateAttributeExpressions().getValue(); + } + + protected WriteConcern getWriteConcern(final ConfigurationContext context) { + final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue(); + WriteConcern writeConcern = null; + switch (writeConcernProperty) { + case WRITE_CONCERN_ACKNOWLEDGED: + writeConcern = WriteConcern.ACKNOWLEDGED; + break; + case WRITE_CONCERN_UNACKNOWLEDGED: + writeConcern = WriteConcern.UNACKNOWLEDGED; + break; + case WRITE_CONCERN_FSYNCED: + writeConcern = WriteConcern.FSYNCED; + break; + case WRITE_CONCERN_JOURNALED: + writeConcern = WriteConcern.JOURNALED; + break; + case WRITE_CONCERN_REPLICA_ACKNOWLEDGED: + writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED; + break; + case WRITE_CONCERN_MAJORITY: + writeConcern = WriteConcern.MAJORITY; + break; + default: + writeConcern = WriteConcern.ACKNOWLEDGED; + } + return writeConcern; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return descriptors; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java new file mode 100644 index 000000000000..fe86429997a3 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mongodb; + +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.UpdateOptions; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.reporting.InitializationException; +import org.bson.Document; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@Tags({"mongo", "mongodb", "service"}) +@CapabilityDescription( + "Provides a controller service that wraps most of the functionality of the MongoDB driver." +) +public class MongoDBControllerService extends AbstractMongoDBControllerService implements MongoDBClientService { + private MongoDatabase db; + private MongoCollection col; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { + this.createClient(context); + this.db = this.mongoClient.getDatabase(context.getProperty(MongoDBControllerService.DATABASE_NAME).getValue()); + this.col = this.db.getCollection(context.getProperty(MongoDBControllerService.COLLECTION_NAME).getValue()); + } + + @OnDisabled + public void onDisable() { + this.mongoClient.close(); + } + + @Override + public long count(Document query) { + return this.col.count(query); + } + + @Override + public void delete(Document query) { + this.col.deleteMany(query); + } + + @Override + public boolean exists(Document query) { + return this.col.count(query) > 0; + } + + public Document findOne(Document query) { + MongoCursor cursor = this.col.find(query).limit(1).iterator(); + Document retVal = cursor.next(); + cursor.close(); + + return retVal; + } + + @Override + public List findMany(Document query) { + return findMany(query, null, -1); + } + + @Override + public List findMany(Document query, int limit) { + return findMany(query, null, limit); + } + + @Override + public List findMany(Document query, Document sort, int limit) { + FindIterable fi = this.col.find(query); + if (limit > 0) { + fi = fi.limit(limit); + } + if (sort != null) { + fi = fi.sort(sort); + } + MongoCursor cursor = fi.iterator(); + List retVal = new ArrayList<>(); + while (cursor.hasNext()) { + retVal.add(cursor.next()); + } + cursor.close(); + + return retVal; + } + + @Override + public void insert(Document doc) { + this.col.insertOne(doc); + } + + @Override + public void insert(List docs) { + this.col.insertMany(docs); + } + + @Override + public void update(Document query, Document update, boolean multiple) { + if (multiple) { + this.col.updateMany(query, update); + } else { + this.col.updateOne(query, update); + } + } + + @Override + public void update(Document query, Document update) { + update(query, update, true); + } + + @Override + public void updateOne(Document query, Document update) { + this.update(query, update, false); + } + + @Override + public void upsert(Document query, Document update) { + this.col.updateOne(query, update, new UpdateOptions().upsert(true)); + } + + @Override + public void dropDatabase() { + this.db.drop(); + this.col = null; + } + + @Override + public void dropCollection() { + this.col.drop(); + this.col = null; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java new file mode 100644 index 000000000000..f85274445f9a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mongodb; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.LookupService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.bson.Document; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + + +@Tags({"mongo", "mongodb", "lookup", "record"}) +@CapabilityDescription( + "Provides a lookup service based around MongoDB. Each key that is specified \n" + + "will be added to a query as-is. For example, if you specify the two keys, \n" + + "user and email, the resulting query will be { \"user\": \"tester\", \"email\": \"tester@test.com\" }.\n" + + "The query is limited to the first result (findOne in the Mongo documentation). If no \"Lookup Value Field\" is specified " + + "then the entire MongoDB result document minus the _id field will be returned as a record." +) +public class MongoDBLookupService extends MongoDBControllerService implements LookupService { + + public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new PropertyDescriptor.Builder() + .name("mongo-lookup-value-field") + .displayName("Lookup Value Field") + .description("The field whose value will be returned when the lookup key(s) match a record. If not specified then the entire " + + "MongoDB result document minus the _id field will be returned as a record.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + private String lookupValueField; + + private static final List lookupDescriptors; + + static { + lookupDescriptors = new ArrayList<>(); + lookupDescriptors.addAll(descriptors); + lookupDescriptors.add(LOOKUP_VALUE_FIELD); + } + + @Override + public Optional lookup(Map coordinates) throws LookupFailureException { + Map clean = new HashMap<>(); + clean.putAll(coordinates); + Document query = new Document(clean); + + if (coordinates.size() == 0) { + throw new LookupFailureException("No keys were configured. Mongo query would return random documents."); + } + + try { + Document result = this.findOne(query); + + if (lookupValueField != null && !lookupValueField.equals("")) { + return Optional.ofNullable(result.get(lookupValueField)); + } else { + final List fields = new ArrayList<>(); + + for (String key : result.keySet()) { + if (key.equals("_id")) { + continue; + } + fields.add(new RecordField(key, RecordFieldType.STRING.getDataType())); + } + + final RecordSchema schema = new SimpleRecordSchema(fields); + return Optional.ofNullable(new MapRecord(schema, result)); + } + } catch (Exception ex) { + getLogger().error("Error during lookup {}", new Object[]{ query.toJson() }, ex); + throw new LookupFailureException(ex); + } + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { + this.lookupValueField = context.getProperty(LOOKUP_VALUE_FIELD).getValue(); + super.onEnabled(context); + } + + @Override + public Class getValueType() { + return Record.class; + } + + @Override + public Set getRequiredKeys() { + return Collections.emptySet(); + } + + @Override + protected List getSupportedPropertyDescriptors() { + + return lookupDescriptors; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/Validation.java b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/Validation.java new file mode 100644 index 000000000000..bad87c8563b5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/Validation.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mongodb; + +import com.mongodb.MongoClientURI; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; + +public class Validation { + public static final Validator DOCUMENT_VALIDATOR = new Validator() { + + @Override + public ValidationResult validate(String subject, String value, ValidationContext context) { + final ValidationResult.Builder builder = new ValidationResult.Builder(); + builder.subject(subject).input(value); + + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return builder.valid(true).explanation("Contains Expression Language").build(); + } + + String reason = null; + try { + new MongoClientURI(value); + } catch (final Exception e) { + reason = e.getLocalizedMessage(); + } + + return builder.explanation(reason).valid(reason == null).build(); + } + }; +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 000000000000..46538c5f0ac5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.mongodb.MongoDBLookupService +org.apache.nifi.mongodb.MongoDBControllerService \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestControllerServiceProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestControllerServiceProcessor.java new file mode 100644 index 000000000000..1305d8c62cec --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestControllerServiceProcessor.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mongodb; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.ArrayList; +import java.util.List; + +public class TestControllerServiceProcessor extends AbstractProcessor { + + static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("Client Service") + .description("MongoDBClientService") + .identifiesControllerService(MongoDBClientService.class) + .required(true) + .build(); + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List getSupportedPropertyDescriptors() { + List propDescs = new ArrayList<>(); + propDescs.add(CLIENT_SERVICE); + return propDescs; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestLookupServiceProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestLookupServiceProcessor.java new file mode 100644 index 000000000000..f8b7f0bc5030 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestLookupServiceProcessor.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mongodb; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.ArrayList; +import java.util.List; + +public class TestLookupServiceProcessor extends AbstractProcessor { + + static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("Client Service") + .description("MongoDBLookupService") + .identifiesControllerService(MongoDBLookupService.class) + .required(true) + .build(); + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List getSupportedPropertyDescriptors() { + List propDescs = new ArrayList<>(); + propDescs.add(CLIENT_SERVICE); + return propDescs; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestMongoDBControllerService.java b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestMongoDBControllerService.java new file mode 100644 index 000000000000..37fbef51bbf5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestMongoDBControllerService.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mongodb; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.bson.Document; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +@Ignore("This is an integration test and requires a copy of MongoDB running on localhost") +public class TestMongoDBControllerService { + private static final String DB_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis()); + private static final String COL_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis()); + + private TestRunner runner; + private MongoDBControllerService service; + + @Before + public void before() throws Exception { + runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class); + service = new MongoDBControllerService(); + runner.addControllerService("Client Service", service); + runner.setProperty(service, MongoDBControllerService.DATABASE_NAME, DB_NAME); + runner.setProperty(service, MongoDBControllerService.COLLECTION_NAME, COL_NAME); + runner.setProperty(service, MongoDBControllerService.URI, "mongodb://localhost:27017"); + runner.enableControllerService(service); + } + + @After + public void after() throws Exception { + service.dropDatabase(); + service.onDisable(); + } + + @Test + public void testInit() throws Exception { + runner.assertValid(service); + } + + @Test + public void testBasicCRUD() throws Exception { + Document doc = service.convertJson("{\n" + + "\t\"uuid\": \"x-y-z\",\n" + + "\t\"message\": \"Testing!\"\n" + + "}"); + Document lookup = service.convertJson("{ \"uuid\": \"x-y-z\" }"); + Document update = service.convertJson("{\n" + + "\t\"$set\": {\n" + + "\t\t\"updatedBy\": \"testUser\"\n" + + "\t}\n" + + "}"); + + service.insert(doc); + Document result = service.findOne(lookup); + + Assert.assertNotNull("The result was null", result); + Assert.assertEquals("The UUID did not match", result.getString("uuid"), "x-y-z"); + Assert.assertNotNull("The message block was missing", result.getString("message")); + Assert.assertEquals("The message block did not match", result.getString("message"), "Testing!"); + + service.update(lookup, update, false); + + result = service.findOne(lookup); + + Assert.assertNotNull("The result was null", result); + Assert.assertEquals("The UUID did not match", result.getString("uuid"), "x-y-z"); + Assert.assertNotNull("The message block was missing", result.getString("message")); + Assert.assertEquals("The message block did not match", result.getString("message"), "Testing!"); + Assert.assertNotNull("The updatedBy block was missing", result.getString("updatedBy")); + Assert.assertEquals("The updatedBy block did not match", result.getString("updatedBy"), "testUser"); + + service.delete(lookup); + + boolean exists = service.exists(lookup); + + Assert.assertFalse("After the delete, the document still existed", exists); + } + + @Test + public void testMultipleCRUD() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + List sampleDocuments = new ArrayList<>(); + List uuids = new ArrayList<>(); + Map mappings = new HashMap<>(); + Random random = new Random(); + int count = random.nextInt(1000); + for (int x = 0; x < count; x++) { + Map doc = new HashMap<>(); + String uuid = UUID.randomUUID().toString(); + String ts = Calendar.getInstance().getTime().toString(); + uuids.add(uuid); + mappings.put(uuid, ts); + + doc.put("uuid", uuid); + doc.put("timestamp", ts); + doc.put("randomNumber", random.nextInt(10)); + + String json = mapper.writeValueAsString(doc); + sampleDocuments.add(service.convertJson(json)); + } + + service.insert(sampleDocuments); + + long docCount = service.count(service.convertJson("{}")); + + Assert.assertEquals("The counts did not match", docCount, count); + for (String uuid : uuids) { + Document lookup = service.convertJson(String.format("{ \"uuid\": \"%s\" }", uuid)); + Document result = service.findOne(lookup); + Assert.assertNotNull("The document was not found", result); + Assert.assertEquals("The uuid did not match", result.getString("uuid"), uuid); + Assert.assertEquals("The timestamp did not match", result.getString("timestamp"), mappings.get(uuid)); + } + + Document query = service.convertJson("{ \"randomNumber\": 5 }"); + docCount = service.count(query); + List results = service.findMany(query); + + Assert.assertTrue("Count should have been >= 1", docCount >= 1); + Assert.assertNotNull("Result set was null", results); + Assert.assertEquals("The counts did not match up", docCount, results.size()); + } + + @Test + public void testUpsert() throws Exception { + Document query = service.convertJson(String.format("{ \"uuid\": \"%s\" }", UUID.randomUUID().toString())); + Document update = service.convertJson("{ \"$set\": { \"message\": \"Hello, world\" } }"); + service.upsert(query, update); + + Document result = service.findOne(query); + Assert.assertNotNull("No result returned", result); + Assert.assertEquals("UUID did not match", result.getString("uuid"), query.getString("uuid")); + Assert.assertEquals("Message did not match", result.getString("message"), "Hello, world"); + + Map mappings = new HashMap<>(); + for (int x = 0; x < 5; x++) { + String fieldName = String.format("field_%d", x); + String uuid = UUID.randomUUID().toString(); + mappings.put(fieldName, uuid); + update = service.convertJson(String.format("{ \"$set\": { \"%s\": \"%s\" } }", fieldName, uuid)); + + service.upsert(query, update); + } + + result = service.findOne(query); + + for (Map.Entry entry : mappings.entrySet()) { + Assert.assertEquals("Entry did not match.", entry.getValue(), result.getString(entry.getKey())); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestMongoDBLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestMongoDBLookupService.java new file mode 100644 index 000000000000..ff220ae4e758 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestMongoDBLookupService.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mongodb; + +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.bson.Document; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.Calendar; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +@Ignore("This is an integration test and requires a copy of MongoDB running on localhost") +public class TestMongoDBLookupService { + private static final String DB_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis()); + private static final String COL_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis()); + + private TestRunner runner; + private MongoDBLookupService service; + + @Before + public void before() throws Exception { + runner = TestRunners.newTestRunner(TestLookupServiceProcessor.class); + service = new MongoDBLookupService(); + runner.addControllerService("Client Service", service); + runner.setProperty(service, MongoDBLookupService.DATABASE_NAME, DB_NAME); + runner.setProperty(service, MongoDBLookupService.COLLECTION_NAME, COL_NAME); + runner.setProperty(service, MongoDBLookupService.URI, "mongodb://localhost:27017"); + runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message"); + } + + @After + public void after() throws Exception { + service.dropDatabase(); + service.onDisable(); + } + + @Test + public void testInit() throws Exception { + runner.enableControllerService(service); + runner.assertValid(service); + } + + @Test + public void testLookupSingle() throws Exception { + runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message"); + runner.enableControllerService(service); + Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }"); + service.insert(document); + + Map criteria = new HashMap<>(); + criteria.put("uuid", "x-y-z"); + Optional result = service.lookup(criteria); + + Assert.assertNotNull("The value was null.", result.get()); + Assert.assertEquals("The value was wrong.", "Hello, world", result.get()); + + Map clean = new HashMap<>(); + clean.putAll(criteria); + service.delete(new Document(clean)); + + boolean error = false; + try { + service.lookup(criteria); + } catch (LookupFailureException ex) { + error = true; + } + + Assert.assertTrue("An error should have been thrown.", error); + } + + @Test + public void testLookupRecord() throws Exception { + runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, ""); + runner.enableControllerService(service); + Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }"); + service.insert(document); + + Map criteria = new HashMap<>(); + criteria.put("uuid", "x-y-z"); + Optional result = service.lookup(criteria); + + Assert.assertNotNull("The value was null.", result.get()); + Assert.assertTrue("The value was wrong.", result.get() instanceof MapRecord); + MapRecord record = (MapRecord)result.get(); + Assert.assertEquals("The value was wrong.", "Hello, world", record.getAsString("message")); + Assert.assertEquals("The value was wrong.", "x-y-z", record.getAsString("uuid")); + + Map clean = new HashMap<>(); + clean.putAll(criteria); + service.delete(new Document(clean)); + + boolean error = false; + try { + service.lookup(criteria); + } catch (LookupFailureException ex) { + error = true; + } + + Assert.assertTrue("An error should have been thrown.", error); + } + + @Test + public void testServiceParameters() throws Exception { + runner.enableControllerService(service); + Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }"); + service.insert(document); + + Map criteria = new HashMap<>(); + criteria.put("uuid", "x-y-z"); + + boolean error = false; + try { + service.lookup(criteria); + } catch(Exception ex) { + error = true; + } + + Assert.assertFalse("An error was thrown when no error should have been thrown.", error); + error = false; + + try { + service.lookup(new HashMap()); + } catch (Exception ex) { + error = true; + Assert.assertTrue("The exception was the wrong type", ex instanceof LookupFailureException); + } + + Assert.assertTrue("An error was not thrown when the input was empty", error); + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/pom.xml new file mode 100644 index 000000000000..c2d84e5336d5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-mongodb-services-bundle/pom.xml @@ -0,0 +1,28 @@ + + + 4.0.0 + + org.apache.nifi + nifi-standard-services + 1.4.0-SNAPSHOT + + nifi-mongodb-services-bundle + pom + + nifi-mongodb-services + nifi-mongodb-services-nar + + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml index 556a704334af..8281cf97c57c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml @@ -51,6 +51,11 @@ nifi-dbcp-service-api compile + + org.apache.nifi + nifi-mongodb-client-service-api + compile + org.apache.nifi nifi-hbase-client-service-api diff --git a/nifi-nar-bundles/nifi-standard-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/pom.xml index 8ae43b7c1598..5a44dc719fd1 100644 --- a/nifi-nar-bundles/nifi-standard-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/pom.xml @@ -41,5 +41,7 @@ nifi-record-serialization-service-api nifi-record-serialization-services-bundle nifi-hwx-schema-registry-bundle + nifi-mongodb-client-service-api + nifi-mongodb-services-bundle diff --git a/pom.xml b/pom.xml index b719f6f51543..c8619d1e101b 100644 --- a/pom.xml +++ b/pom.xml @@ -1109,7 +1109,7 @@ org.apache.nifi - nifi-kudu-nar + nifi-mongodb-services-nar 1.4.0-SNAPSHOT nar @@ -1597,6 +1597,11 @@ nifi-hbase-client-service-api 1.4.0-SNAPSHOT + + org.apache.nifi + nifi-mongodb-client-service-api + 1.4.0-SNAPSHOT + org.apache.nifi nifi-websocket-services-api