Building a Simple Custom Processor With Apache Ni-Fi


Apache NiFi is a powerful tool for automating data flows with a large number of built-in processors. However, sometimes there is a need for specific processors to handle unique requirements and data stores. In such cases, creating custom processors becomes a necessity. In this article, we will go through the steps to create a very simple custom processor for Apache NiFi.

Setup and sources

gradle setup

dependencies {
compile “org.apache.nifi:nifi-api:*”
compile “org.apache.nifi:nifi-utils:1.9.2”
testCompile “org.apache.nifi:nifi-mock:1.9.2”
}


source of the custom processor

import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;

@Tags({“example”})
@CapabilityDescription(“Hello World to output”)
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute = “”, description = “”)})
@WritesAttributes({@WritesAttribute(attribute = “”, description = “”)})
public class MyProcessor extends AbstractProcessor {

public static final PropertyDescriptor propertyDescriptor = new PropertyDescriptor
.Builder().name(“name”)
.displayName(“name”)
.description(“Name to print”)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final Relationship successRelations = new Relationship.Builder()
.name(“success”)
.description(“If everything is ok”)
.build();

public static final Relationship failureRelations = new Relationship.Builder()
.name(“failure”)
.description(“If something is wrong”)
.build();

private List<PropertyDescriptor> descriptorList;

private Set<Relationship> relationshipSet;

@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(propertyDescriptor);
this.descriptorList = Collections.unmodifiableList(descriptors);

final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(successRelations);
relationships.add(failureRelations);
this.relationshipSet = Collections.unmodifiableSet(relationships);
}

public Set<Relationship> getRelationshipSet() {
return this.relationshipSet;
}

@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptorList;
}

@OnScheduled
public void onScheduled(final ProcessContext context) {

}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

try {
String name = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue();

String result = “Hello ” + name;
session.putAttribute(flowFile, “result”, result);

try (OutputStream flowFileOutputStream = session.write(flowFile)) {
flowFileOutputStream.write(result.getBytes(StandardCharsets.UTF_8));
}

session.transfer(flowFile, successRelations);
} catch (IOException e) {
//IO Error processing error log
session.transfer(flowFile, failureRelations);
} catch (ProcessException e) {
// Process error log
getLogger().error(“Processing error”, e);
session.transfer(flowFile, failureRelations);
}
}
}

Build and deploy

Build the project with Gradle: gradle build.
Find the generated .nar file in the target folder of your module.
Copy the .nar file to the lib folder of your NiFi installation.


Screenshot of how the custom processor integrates seamlessly


I hope this article was useful for you! Creating a custom processor for Apache NiFi allows you to customize it to meet the specific requirements of your project. By following the steps above, you will be able to develop and integrate a custom processor that will enhance the processing capabilities of your system.

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.