Contents

Part 3 - Building a Custom Source Component

So far in this series I have shown how to create a custom connection manager and also how to create a user friendly interface to configure and test it. In this post I’m going to outline the steps I went through to create a custom source component for SSIS. By the end of this post I will show messages that are in a RabbitMQ queue being consumed by an SSIS package.

Preparing the Solution

The first step is to create a new class library project in the solution. I called mine SSISRabbitMQ.RabbitMQSource. Then add references to the following assemblies:

  • Microsoft.SqlServer.Dts.Design
  • Microsoft.SqlServer.DTSPipelineWrap
  • Microsoft.SqlServer.DTSRuntimeWrap
  • Microsoft.SqlServer.ManagedDTS
  • Microsoft.SqlServer.PipelineHost

These can be found  in C:\Program Files (x86)\Microsoft SQL Server\110\SDK\Assemblies

Then using Part 1 as a reference perform the following steps:

  • Add the RabbitMQ client library via NuGet
  • Add a post build step to copy the output assembly into the GAC

Then add a new class file called RabbitMQSource.cs into the project.

Creating the Custom Source

To have SSDT BI recognize the RabbitMQSource class as a source component, two things need to be done:

  • Apply the DtsPipelineComponent attribute
  • Extend and override methods from the PipelineComponent class (MSDN documentation)

The DtsPipelineComponent attribute is similar to the DtsConnection attribute which was applied to the custom connection manager in the first post. This attribute signals to SSDT BI that it is a custom component.

There are a number of properties that need to be set:

  • DisplayName - this is what will be displayed in the SSIS Toolbox in SSDT BI
  • Description - this will also be displayed in the SSIS Toolbox
  • ComponentType - this tells SSDT BI what kind of component the class is, (Source, Destination or Transformation)

After applying the attribute and extending the PipelineComponent class so far I have this code:

1
2
3
4
5
6
[DtsPipelineComponent(DisplayName = "RabbitMQ Source",
  ComponentType = ComponentType.SourceAdapter,
  Description = "Connection source for RabbitMQ")]
public class RabbitMQSource : PipelineComponent
{
}

Implementing the Custom Source

The first method to override is ProvideComponentProperties. This is the method which is called when the source is added to a package. In here goes all the setup for the source component. In the code below you can see I’m doing three things:

  1. Creating a new output
  2. Creating a custom property for the Queue name
  3. Creating a connection
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public override void ProvideComponentProperties()
{
  // Reset the component.
  base.RemoveAllInputsOutputsAndCustomProperties();
  ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
 
  IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
  output.Name = "Output";
 
  IDTSCustomProperty100 queueName = ComponentMetaData.CustomPropertyCollection.New();
  queueName.Name = "QueueName";
  queueName.Description = "The name of the RabbitMQ queue to read messages from";
 
  IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
  connection.Name = "RabbitMQ";
  connection.ConnectionManagerID = "RabbitMQ";
 
  CreateColumns();
}

Very important and something that took me a little while to figure out was the line:

connection.ConnectionManagerID = “RabbitMQ”;

Alot of other tutorials that I read online left this out, however I found that without it my source didn’t work.

Basically, this line is saying “In the package there needs to be a connection called RabbitMQ”, it will also need to be a RabbitMQConnectionManager which will become obvious in the AcquireConnections method below.

(In the next post I will be showing how to create a custom user interface for this source which will allow the user to select the appropriate connection manager, which will avoid the need to hard coding this value!)

The CreateColumns method looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private void CreateColumns()
{
  IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
 
  output.OutputColumnCollection.RemoveAll();
  output.ExternalMetadataColumnCollection.RemoveAll();
 
  IDTSOutputColumn100 column1 = output.OutputColumnCollection.New();
  IDTSExternalMetadataColumn100 exColumn1 = output.ExternalMetadataColumnCollection.New();
 
  IDTSOutputColumn100 column2 = output.OutputColumnCollection.New();
  IDTSExternalMetadataColumn100 exColumn2 = output.ExternalMetadataColumnCollection.New();
 
  column1.Name = "MessageContents";
  column1.SetDataTypeProperties(DataType.DT_WSTR, 4000, 0, 0, 0);
 
  column2.Name = "RoutingKey";
  column2.SetDataTypeProperties(DataType.DT_WSTR, 100, 0, 0, 0);
}

In here I am setting up the Output, adding two columns. The first will contain the message contents, and the second column will contain the routing key that was used to publish the message in RabbitMQ.

Connecting to External Data Sources

As the source will be making use of a connection manager there are two methods to override, these are the AcquireConnections and ReleaseConnections. Both the AcquireConnections and ReleaseConnections methods are called during design time for validation purposes and at runtime to establish a connection for use during package execution.

The AcquireConnections method will look for the ConnectionManager object on the connection that was created in the ProvideComponentProperties method. In the method below, once it has a reference to the connection manager (line 5 and 6) it checks that what has been returned is a RabbitMQConnectionManager (line 8 and 10). It then retrieves the queue name that was set on the source component and then calls the AcquireConnection on the connection manager itself. This will attempt to establish a connection to the RabbitMQ broker, which will return an IConnection. (For more details on this check out the first post.) It then stores the IConnection which was returned so it can be used later on in the component, as well as used in the ReleaseConnections method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public override void AcquireConnections(object transaction)
{
  if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
  {
    ConnectionManager connectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(
      ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
 
    this.rabbitMqConnectionManager = connectionManager.InnerObject as RabbitMQConnectionManager.RabbitMQConnectionManager;
 
    if (this.rabbitMqConnectionManager == null)
      throw new Exception("Couldn't get the RabbitMQ connection manager, ");
 
this.queueName = ComponentMetaData.CustomPropertyCollection["QueueName"].Value;
 rabbitConnection = this.rabbitMqConnectionManager.AcquireConnection(transaction) as IConnection;
 }
}

The next method to override is the ReleaseConnections, whose purpose it is to do any cleanup on any connections created by the AcquireConnections method. All I am doing below is checking that the rabbitMqConnectionManager has been set, then passing the rabbitConnection variable into it to be released, this is the IConnection which was returned in the AcquireConnections method.

1
2
3
4
5
6
7
public override void ReleaseConnections()
{
  if (rabbitMqConnectionManager != null)
  {
    this.rabbitMqConnectionManager.ReleaseConnection(rabbitConnection);
  }
}

Validating the Source

To ensure that the source has been correctly configured by the user, I have implementation the Validate method. All this does is check if the user has set the queue name. For more details on the Validate method see the MSDN documentation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public override DTSValidationStatus Validate()
{
  bool cancel;
  string qName = ComponentMetaData.CustomPropertyCollection["QueueName"].Value;
 
  if (string.IsNullOrWhiteSpace(qName))
  {
    //Validate that the QueueName property is set
    ComponentMetaData.FireError(0, ComponentMetaData.Name, "The QueueName property must be set", "", 0, out cancel);
    return DTSValidationStatus.VS_ISBROKEN;
  }
 
  return base.Validate();
}

Getting messages out of RabbitMQ and into SSIS

Now that all the dull setup bits are out of the way, it is time for the interesting part!

There are a number of methods that are executed as part of the SSIS Pipeline, in this section I will be showing the PreExecute and PrimeOutput methods. The sequence of execution is:

  1. PrepareForExecute
  2. PreExecute
  3. PrimeOutput for a source and ProcessInput for a destination/transformation
  4. PostExecute
  5. Cleanup

During execution the PreExecute method is called once, therefore the MSDN documentation recommends that you place as much logic in here as possible. In the PreExecute method below, I am creating the channel and consumer that will be used in the PrimeOutput method to retrieve messages from the RabbitMQ queue.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public override void PreExecute()
{
  try
  {
    this.consumerChannel = rabbitConnection.CreateModel();
    this.consumerChannel.QueueDeclare(queueName, true, false, false, null);
    this.queueConsumer = new QueueingBasicConsumer(this.consumerChannel);
    this.consumerTag = consumerChannel.BasicConsume(queueName, true, queueConsumer);
  }
  catch (Exception)
  {
    ReleaseConnections();
    throw;
  }
}

The PrimeOutput method is called after PreExecute, it is in here that rows are added to the output buffer. The number of output buffers in the buffers array is determined by the IDTSOutputCollection100. In the ProvideComponentProperties method at the beginning of this post I am adding a single output to this collection.

In the PrimeOutput method below, you can see that the component will continually retrieve messages from the RabbitMQ queue until there are none left.

Then for each message that it retrieves it calls the AddRow method on the buffer, this causes a new row to be created, it then sets the message contents and the routing key values of the message on the row.

Finally, the SetEndOfRowset method is called to indicate to the source that the component has finished adding rows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
  IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
  PipelineBuffer buffer = buffers[0];
 
  object message;
  bool success;
 
  while (queueConsumer.IsRunning)
  {
    try
    {
      success = queueConsumer.Queue.Dequeue(100, out message);
    }
    catch (Exception)
    {
      break;
    }
 
    if (success)
    {
      BasicDeliverEventArgs e = (BasicDeliverEventArgs)message;
 
      var messageContent = System.Text.Encoding.UTF8.GetString(e.Body);
 
      buffer.AddRow();
      buffer[0] = messageContent;
      buffer[1] = e.RoutingKey;
    }
    else
    {
      break;
    }
  }
 
  buffer.SetEndOfRowset();
}

That is enough to start getting messages out of RabbitMQ and into the SSIS package.

Cleaning Up

The last stage is to clean up after ourselves. In the Cleanup method below I am cancelling the consumer that was created in the PreExecute method and then closing the channel.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public override void Cleanup()
{
  if (consumerChannel.IsOpen)
  {
    if (queueConsumer.IsRunning)
    {
      consumerChannel.BasicCancel(consumerTag);
    }
    consumerChannel.Close();
  }
  base.Cleanup();
}

The ReleaseConnections method will be called by SSIS to release the connection to RabbitMQ.

What it looks like

It is now time to run the package.

Below you can see the data flow tasks highlighted in the middle of the screen, to the right is the QueueName property and down the bottom is the RabbitMQ Connection Manager.

/part-3-building-a-custom-source-component/images/ssis-rabbitmq-source-1.png

Below you can see that the RabbitMQ Source has dequeued 2 messages, which are displayed in the data viewer, it has a tick indicating that it has finished dequeuing messages (this is correct because I only put two messages into the queue).

/part-3-building-a-custom-source-component/images/ssis-rabbitmq-source-2.png

Up next I will be building a custom interface for configuring the RabbitMQ Source.

🍪 I use Disqus for comments

Because Disqus requires cookies this site doesn't automatically load comments.

I don't mind about cookies - Show me the comments from now on (and set a cookie to remember my preference)