Part 3 - Building a Custom Source Component
So far in this series I have shown how to create a custom connection manager and also how to create a user friendly interface to configure and test it. In this post I’m going to outline the steps I went through to create a custom source component for SSIS. By the end of this post I will show messages that are in a RabbitMQ queue being consumed by an SSIS package.
Preparing the Solution
The first step is to create a new class library project in the solution. I called mine SSISRabbitMQ.RabbitMQSource. Then add references to the following assemblies:
These can be found in C:\Program Files (x86)\Microsoft SQL Server\110\SDK\Assemblies
Then using Part 1 as a reference perform the following steps:
- Add the RabbitMQ client library via NuGet
- Add a post build step to copy the output assembly into the GAC
Then add a new class file called RabbitMQSource.cs into the project.
Creating the Custom Source
To have SSDT BI recognize the RabbitMQSource class as a source component, two things need to be done:
- Apply the DtsPipelineComponent attribute
- Extend and override methods from the PipelineComponent class (MSDN documentation)
The DtsPipelineComponent attribute is similar to the DtsConnection attribute which was applied to the custom connection manager in the first post. This attribute signals to SSDT BI that it is a custom component.
There are a number of properties that need to be set:
- DisplayName - this is what will be displayed in the SSIS Toolbox in SSDT BI
- Description - this will also be displayed in the SSIS Toolbox
- ComponentType - this tells SSDT BI what kind of component the class is, (Source, Destination or Transformation)
After applying the attribute and extending the PipelineComponent class so far I have this code:
Implementing the Custom Source
The first method to override is ProvideComponentProperties. This is the method which is called when the source is added to a package. In here goes all the setup for the source component. In the code below you can see I’m doing three things:
- Creating a new output
- Creating a custom property for the Queue name
- Creating a connection
Very important and something that took me a little while to figure out was the line:
connection.ConnectionManagerID = “RabbitMQ”;
Alot of other tutorials that I read online left this out, however I found that without it my source didn’t work.
Basically, this line is saying “In the package there needs to be a connection called RabbitMQ”, it will also need to be a RabbitMQConnectionManager which will become obvious in the AcquireConnections method below.
(In the next post I will be showing how to create a custom user interface for this source which will allow the user to select the appropriate connection manager, which will avoid the need to hard coding this value!)
The CreateColumns method looks like this:
In here I am setting up the Output, adding two columns. The first will contain the message contents, and the second column will contain the routing key that was used to publish the message in RabbitMQ.
Connecting to External Data Sources
As the source will be making use of a connection manager there are two methods to override, these are the AcquireConnections and ReleaseConnections. Both the AcquireConnections and ReleaseConnections methods are called during design time for validation purposes and at runtime to establish a connection for use during package execution.
The AcquireConnections method will look for the ConnectionManager object on the connection that was created in the ProvideComponentProperties method. In the method below, once it has a reference to the connection manager (line 5 and 6) it checks that what has been returned is a RabbitMQConnectionManager (line 8 and 10). It then retrieves the queue name that was set on the source component and then calls the AcquireConnection on the connection manager itself. This will attempt to establish a connection to the RabbitMQ broker, which will return an IConnection. (For more details on this check out the first post.) It then stores the IConnection which was returned so it can be used later on in the component, as well as used in the ReleaseConnections method.
The next method to override is the ReleaseConnections, whose purpose it is to do any cleanup on any connections created by the AcquireConnections method. All I am doing below is checking that the rabbitMqConnectionManager has been set, then passing the rabbitConnection variable into it to be released, this is the IConnection which was returned in the AcquireConnections method.
Validating the Source
To ensure that the source has been correctly configured by the user, I have implementation the Validate method. All this does is check if the user has set the queue name. For more details on the Validate method see the MSDN documentation.
Getting messages out of RabbitMQ and into SSIS
Now that all the dull setup bits are out of the way, it is time for the interesting part!
There are a number of methods that are executed as part of the SSIS Pipeline, in this section I will be showing the PreExecute and PrimeOutput methods. The sequence of execution is:
- PrimeOutput for a source and ProcessInput for a destination/transformation
During execution the PreExecute method is called once, therefore the MSDN documentation recommends that you place as much logic in here as possible. In the PreExecute method below, I am creating the channel and consumer that will be used in the PrimeOutput method to retrieve messages from the RabbitMQ queue.
The PrimeOutput method is called after PreExecute, it is in here that rows are added to the output buffer. The number of output buffers in the buffers array is determined by the IDTSOutputCollection100. In the ProvideComponentProperties method at the beginning of this post I am adding a single output to this collection.
In the PrimeOutput method below, you can see that the component will continually retrieve messages from the RabbitMQ queue until there are none left.
Then for each message that it retrieves it calls the AddRow method on the buffer, this causes a new row to be created, it then sets the message contents and the routing key values of the message on the row.
Finally, the SetEndOfRowset method is called to indicate to the source that the component has finished adding rows.
That is enough to start getting messages out of RabbitMQ and into the SSIS package.
The last stage is to clean up after ourselves. In the Cleanup method below I am cancelling the consumer that was created in the PreExecute method and then closing the channel.
The ReleaseConnections method will be called by SSIS to release the connection to RabbitMQ.
What it looks like
It is now time to run the package.
Below you can see the data flow tasks highlighted in the middle of the screen, to the right is the QueueName property and down the bottom is the RabbitMQ Connection Manager.
Below you can see that the RabbitMQ Source has dequeued 2 messages, which are displayed in the data viewer, it has a tick indicating that it has finished dequeuing messages (this is correct because I only put two messages into the queue).
Up next I will be building a custom interface for configuring the RabbitMQ Source.