Asos.ServiceBus.MessageSiphon
1.2.32
Prefix Reserved
dotnet tool install --global Asos.ServiceBus.MessageSiphon --version 1.2.32
dotnet new tool-manifest # if you are setting up this repo dotnet tool install --local Asos.ServiceBus.MessageSiphon --version 1.2.32
#tool dotnet:?package=Asos.ServiceBus.MessageSiphon&version=1.2.32
nuke :add-package Asos.ServiceBus.MessageSiphon --version 1.2.32
azure-servicebus-message-siphon
A dotnet tool for message siphon scenarios when working with Azure Service Bus, allowing you to purge and replay messages, within a single namespace or across namespaces
What's it for?
You may have cases such as
- Messages have dead lettered and you want to replay them back onto the main queue\topic
- You want to clone messages from one entity to another in the same namespace
- You want to clone and remove messages from one namespace to another
- You want to select messages by a header value from one entity and move them to another
- You want to purge messages, deleting from a queue, topic or dead-letter sub entity.
- You want to use connection strings, RBAC or both for the source and targets. (i.e. connection string for source, RBAC for target)
How does it work?
A configuration file is used that determines the set of work that will be performed. The configuration file is made up
of service bus connection details, and a list of siphon work that needs to be performed. The siphon work then references
the connections by Name
As the consumer of this package, you can define some standard work
you'd like to perform as a set of configuration
files, then use them to execute the work against one or many service bus namespaces - just provide the
appropriate ConnectionString
or FullyQualifiedNamespace
at runtime
Configuration details
ReplayMessagesJob
- A replay messages just has the following
JobType
- Required. the type of work to perform. Can be one of :DeadLettersSameEntity
- replays messages from the sub-queue back onto the parent entityDeadLettersToDifferentEntity
- replays messages from the entity sub-queue, to a different Target entitySourceToTarget
- siphons messages from any source to any target entityPurgeSource
- purges messages from the source entity, a delete operationPurgeDeadLetters
- as per PurgeSource but for dead letter entities
JobName
- Required. Give the work a useful name, used in log messagesNumberOfConcurrentProcesses
- A value between 1 and 10. When performing a siphon job, copying messages fromsource
totarget
,- messages are received in batches as per the
SourceBatchReceiveSize
setting. Once messages are received, they need to be sent to the target andNumberOfConcurrentProcesses
controls how many concurrent operations attempt to work through the batch. E.g. if - your batch size is
100
and you setNumberOfConcurrentProcesses
to10
, then 10 operations will each attempt to process - 10 messages in parallel. Experiment with these settings to find the best combination to achieve high throughput.
ServiceBusDetails
- a list of service bus connections. A ServiceBusDetail instance has the following
Name
- give the item a useful name, you'll use this to refer to it from any siphon work you defineConnectionMode
- how to connect to the service bus, can be one ofConnectionString
orRbac
ConnectionString
- ifConnectionMode
isConnectionString
, then define the SAS connection string to useFullyQualifiedNamespace
= ifConnectionMode
isRbac
, then define the fully qualified namespace, e.g. namespace.servicebus.windows.net
SiphonWork - a list of work to perform, Siphon work requires the following
SiphonMode
- determines the type of operation to perform, which impacts the receive behaviour of the source messages- When
Clone
, the worker will usePeekMessagesAsync
so not to increment the delivery count of messages, and the message will not be received or completed. A copy of the message is created - When
CloneAndDelete
, the worker will useReceiveMessagesAsync
, and will attempt to complete the message once the send operation to target has completed OK - When
Delete
, the worker will use ReceiveMessagesAsync, and will attempt to complete the message once the it's received and no copy of a message will be created.
- When
SourceConnectionName
- theName
of the ServiceBusDetails to use as the sourceSourceEntity
- the name of the entity, can be either a queue or a topicSourceSubscriptions
- ifSourceEntity
is a topic, then define a list of subscription names for the TopicTargetConnectionName
- theName
of the ServiceBusDetails to use as the targetTargetEntity
- the name of the entity to send to, can be either a queue or a topicSourceBatchReceiveSize
- the size of the batch to receive messages from the source. Defaults to20
.MessagesOlderThan
- An ISO8601 date span, as per the duration specification. E.g. PT72H (72 hours), P30D (30 days). When performing a purge, only messages that have an older enqueue time than this value are considered, allowing you to perform work such aspurge messages older than 72 hours
HeaderSelector
- A filter to use on the message headers to select only matching messages. This has the following limitations- Only a single header filter is support - You can filter by one field only, e.g. HeaderName = Value
- Only =, > and < operators are supported
- If you use > or < operators, then the field selector must be numeric, e.g. HeaderName > 12345
Example configurations
Replaying service bus messages from dead letters on the subscriptions
In this configuration, we only need to define a single service bus connection, since we're replaying onto the same namespace/entity. Dead letters on the test and test1 subscriptions are received and put back onto the snapshot entity, both subscriptions would receive copies of the messages again
{
"Logging": {
"LogLevel": {
"Default": "Information"
}
},
"ReplayMessagesJob": {
"JobType": "DeadLettersSameEntity",
"JobName": "Replay-Messages-From-DeadLetter-Subqueue",
"NumberOfConcurrentProcesses": 1,
"ServiceBusDetails": [
{
"Name": "Source",
"ConnectionMode": "ConnectionString",
"ConnectionString": "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-sas-key"
}
],
"SiphonWork": [
{
"SiphonMode": "CloneAndDelete",
"SourceConnectionName": "Source",
"SourceEntity": "snapshot",
"SourceSubscriptions": [ "test", "test1" ]
}
]
}
}
Cloning messages from a subscription in one namespace using a connection string, to another namespace using RBAC.
In this example, messages will be received in batches of 40
, and 5
concurrent tasks will be used to to send them to
the target in parallel (i.e. each task processes 8 messages each)
{
"Logging": {
"LogLevel": {
}
},
"ReplayMessagesJob": {
"JobType": "SourceToTarget",
"JobName": "Local Debug",
"NumberOfConcurrentProcesses": 5,
"ServiceBusDetails": [
{
"Name": "Source",
"ConnectionMode": "ConnectionString",
"ConnectionString": "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=access-key"
},
{
"Name": "Target",
"ConnectionMode": "Rbac",
"FullyQualifiedNamespace": "namespace1.servicebus.windows.net"
}
],
"SiphonWork": [
{
"SiphonMode": "Clone",
"SourceConnectionName": "Source",
"SourceEntity": "topic-entity",
"SourceSubscriptions": [ "test-subscription" ],
"SourceBatchReceiveSize" : 40,
"TargetConnectionName": "Target",
"TargetEntity": "copy-of-topic"
}
]
}
}
Cloning messages from a subscription in one namespace to another topic in the same namespace, using a message selector and age to filter .
In this example, only messages that have header name Payload.MessageType
with a value of ProductUpdated
which are older than 1 hours will be selected
and will be copied from topic-entity
to copy-of-topic
{
"Logging": {
"LogLevel": {
}
},
"ReplayMessagesJob": {
"JobType": "SourceToTargetByMessageSelector",
"JobName": "Local Debug",
"ServiceBusDetails": [
{
"Name": "Source",
"ConnectionMode": "ConnectionString",
"ConnectionString": "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=access-key"
}
],
"SiphonWork": [
{
"SiphonMode": "Clone",
"SourceConnectionName": "Source",
"SourceEntity": "topic-entity",
"SourceSubscriptions": [ "test-subscription" ],
"HeaderSelector": "[Payload.MessageType] = 'ProductUpdated'",
"SourceBatchReceiveSize" : 40,
"TargetConnectionName": "Source",
"TargetEntity": "copy-of-topic",
"MessagesOlderThan" : "PT1H"
}
]
}
}
Using RBAC, receive messages from a dead letter subscription in batches of 100 and purge messages that are older than 1 hour
{
"Logging": {
"LogLevel": {
"Default": "Information"
}
},
"ReplayMessagesJob": {
"JobType": "PurgeDeadLetters",
"JobName": "Purge Debug",
"ServiceBusDetails": [
{
"Name": "Source",
"ConnectionMode": "Rbac",
"FullyQualifiedNamespace": "namespace.servicebus.windows.net"
}
],
"SiphonWork": [
{
"SiphonMode": "Delete",
"SourceConnectionName": "Source",
"SourceEntity": "demo",
"SourceSubscriptions": [ "test" ],
"SourceBatchReceiveSize" : 100,
"MessagesOlderThan" : "PT1H"
}
]
}
}
Move messages from a multiple topic subscriptions in one namespace using RBAC, to another namespace using RBAC.
In this example, messages will be received in batches of 40
, and 5
concurrent tasks will be used to to send them to
the target in parallel (i.e. each task processes 8 messages each)
{
"Logging": {
"LogLevel": {
}
},
"ReplayMessagesJob": {
"JobType": "SourceToTarget",
"JobName": "Local Debug",
"NumberOfConcurrentProcesses": 5,
"ServiceBusDetails": [
{
"Name": "Source",
"ConnectionMode": "Rbac",
"ConnectionString": "namespace1.servicebus.windows.net"
},
{
"Name": "Target",
"ConnectionMode": "Rbac",
"FullyQualifiedNamespace": "namespace2.servicebus.windows.net"
}
],
"SiphonWork": [
{
"SiphonMode": "CloneAndDelete",
"SourceConnectionName": "Source",
"SourceEntity": "topic-1-entity",
"SourceSubscriptions": [ "subscription-A" ],
"SourceBatchReceiveSize" : 10,
"TargetConnectionName": "Target",
"TargetEntity": "copy-of-topic-1"
},
{
"SiphonMode": "CloneAndDelete",
"SourceConnectionName": "Source",
"SourceEntity": "topic-2-entity",
"SourceSubscriptions": [ "subscription-B" ],
"SourceBatchReceiveSize" : 10,
"TargetConnectionName": "Target",
"TargetEntity": "copy-of-topic-2"
}
]
}
}
Add as much work as you think is sensible and each piece of work is executed concurrently.
How to use?
The package is available as a dotnet tool, so you can install from nuget.org by running the following command
dotnet tool update --global Asos.ServiceBus.MessageSiphon ---version x.x.x
Once installed, you have a command available to you siphon-asb-messages
. This requires a single parameter, the path to
the configuration file, e.g.
siphon-asb-messages -n D:\temp\replay-config.json
siphon-asb-messages -n usr/tmp/replay-config.json
You might define a set of files that represent common scenarios for you, which can be easily referenced.
Azure Pipelines
Since this is a dotnet tool, you can run it from your Azure Pipeline on any platform. This lets you take advantage of Managed Identity from Service Connections, and interact with namespaces that may be IP Restricted by running the work from a build pool that is allow-listed on the namespace.
This gives you a way to allow supporting team members to run jobs, replaying or purging messages, in a secure and repeatable manner & without requiring any particular tools or permissions.
For example, another team may have a dependency on a service bus that you own and maintain. The namespace is IP restricted and access is via RBAC - they need to replay some dead letters on a topic-subscription. You can define an Azure pipeline to do this work and define a particular build pool with a known IP, then grant their team members permissions to run that pipeline only. In this way, the consuming team can self-serve without sharing connection strings or having to use any other Bastion type services
To do this, you'd need to define a configuration file in your source repository. Note, we use a variable placeholder
here for $ServiceBusConnectionString
{
"Logging": {
"LogLevel": {
"Default": "Information"
}
},
"ReplayMessagesJob": {
"JobType": "DeadLettersSameEntity",
"JobName": "Dead Letters Replay",
"ServiceBusDetails": [
{
"Name": "Source",
"ConnectionMode": "ConnectionString",
"FullyQualifiedNamespace": "$(ServiceBusConnectionString)"
}
],
"SiphonWork": [
{
"SiphonMode": "CloneAndDelete",
"SourceConnectionName": "Source",
"SourceEntity": "some-topic-orders",
"SourceSubscriptions": [
"the-subscription-name"
]
}
]
}
}
An example pipeline might look like so. In this example, we support either RBAC using a fully qualified namespace, or
connection strings by retrieving a secret from a vault. Replace Tokens
will update the config file with the
appropriate values
parameters:
- name: StageName
displayName: Service Bus Connection String Secret
type: string
default: "ReplayPurgeMessages"
- name: AzureSubscription
displayName: The name of the service connection that's used to connect to Azure
type: string
- name: ConfigPath
displayName: Path to config
type: string
- name: ConfigFileName
displayName: Message replay config
type: string
- name: Environment
displayName: Environment
type: string
- name: ServiceBusFullyQualifiedName
displayName: ServiceBusFullyQualifiedName
type: string
default: ""
- name: KeyVaultSubscriptionName
displayName: Name of the subscription that contains the key vault, if using it to retrieve connection strings
type: string
default: ""
- name: KeyVaultName
displayName: Key Vault Name
type: string
default: ""
- name: SecretName
displayName: Service Bus Connection String Secret
type: string
default: ""
stages:
- stage: ${{parameters.StageName}}
displayName: ${{parameters.StageName}}
pool:
vmImage: ubuntu-latest
jobs:
- job:
steps:
- checkout: self
- task: AzureCLI@2
displayName: Get Service Bus Connection String From Vault
condition: and(succeeded(), ne('${{ parameters.KeyVaultName }}', ''))
inputs:
azureSubscription: ${{parameters.AzureSubscription}}
scriptType: pscore
scriptLocation: inlineScript
inlineScript: |
$subscription = ${{parameters.KeyVaultSubscriptionName}}
$connectionString = az keyvault secret show --name ${{parameters.SecretName}} --subscription $subscription --vault-name ${{parameters.KeyVaultName}} --query value -o tsv
echo "##vso[task.setvariable variable=ServiceBusConnectionString]$connectionString"
- task: AzureCLI@2
displayName: Set variable to fully qualified namespace parameter
condition: and(succeeded(), eq('${{ parameters.KeyVaultName }}', ''))
inputs:
azureSubscription: ${{parameters.AzureSubscription}}
scriptType: pscore
scriptLocation: inlineScript
inlineScript: |
$namespaceName = ${{parameters.ServiceBusFullyQualifiedName}}
echo "Connecting to namespace " + $namespaceName
echo "##vso[task.setvariable variable=ServiceBusFullyQualifiedName]$namespaceName"
- task: replacetokens@4
displayName: Replace Tokens
inputs:
targetFiles: '${{parameters.ConfigPath}}/*'
encoding: 'auto'
tokenPattern: 'azpipelines'
writeBOM: true
actionOnMissing: 'warn'
keepToken: false
actionOnNoFiles: 'continue'
enableTransforms: false
useLegacyPattern: false
enableTelemetry: true
- task: AzureCLI@2
displayName: Perform Message Work
inputs:
azureSubscription: ${{parameters.AzureSubscription}}
scriptType: pscore
scriptLocation: inlineScript
inlineScript: |
dotnet tool update Asos.ServiceBus.MessageSiphon --tool-path . --version 1.20.0
& "./siphon-asb-messages" -n ${{parameters.ConfigPath}}/${{parameters.ConfigFileName}}
AKS Cronjob
Another option is to run the Message Siphon as an AKS Cronjob. This will start the tool at an interval defined in a Cron expression.
To achieve this you need to:
- Create a DevOps repository in your area, or use an existing one.
- Copy the
charts/values.yaml
from this repo and change the commented values to be correct for your platform - Create a configuration file for the message siphon tool using the guidance above - an example is present in the
.azdo/configuration
folder. - Create a DevOps Pipeline to deploy the application to the AKS cluster. An example pipeline has been provided in the
.azdo
folder which reads the configuration file into a DevOps variable, substitutes the values and then uses the Asos Core helm charts with thevalues.yaml
to deploy the component to the cluster.
Limitations of the Cronjob
Using RBAC access to Service Bus connections is not supported at present. This will be resolved once Workload Identity becomes available. Access must therefore be achieved using Connection Strings. The example pipeline provided assumes that the connection string is stored in KeyVault and accessed at deployment time via a DevOps library variable.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
This package has no dependencies.
Version | Downloads | Last updated |
---|---|---|
1.2.32 | 115 | 7/8/2024 |
1.2.31 | 102 | 7/1/2024 |
1.2.30 | 94 | 6/24/2024 |
1.2.29 | 79 | 6/24/2024 |
1.2.28 | 111 | 6/17/2024 |
1.2.27 | 85 | 6/12/2024 |
1.2.26 | 88 | 6/10/2024 |
1.2.25 | 75 | 6/4/2024 |
1.2.24 | 96 | 6/3/2024 |
1.2.23 | 93 | 5/28/2024 |
1.2.22 | 95 | 5/20/2024 |
1.2.21 | 110 | 5/20/2024 |
1.2.20 | 65 | 5/13/2024 |
1.2.19 | 81 | 5/13/2024 |
1.2.18 | 131 | 5/6/2024 |
1.2.17 | 116 | 4/29/2024 |
1.2.16 | 109 | 4/22/2024 |
1.2.15 | 86 | 4/22/2024 |
1.2.14 | 145 | 4/17/2024 |
1.2.12 | 196 | 4/12/2024 |
1.1.35 | 226 | 1/30/2024 |
1.1.34 | 206 | 1/26/2024 |
1.1.33 | 183 | 1/8/2024 |
1.1.26 | 228 | 12/6/2023 |
1.1.24 | 136 | 12/4/2023 |
1.1.23 | 139 | 12/4/2023 |
1.1.22 | 164 | 12/4/2023 |
1.1.21 | 171 | 12/4/2023 |
1.1.20 | 157 | 11/27/2023 |
1.1.19 | 179 | 11/20/2023 |
1.1.18 | 192 | 11/13/2023 |
1.1.17 | 244 | 11/6/2023 |
1.1.16 | 210 | 10/23/2023 |
1.1.15 | 257 | 10/17/2023 |
1.1.14 | 242 | 10/17/2023 |
1.1.13 | 242 | 10/16/2023 |
1.1.12 | 221 | 10/16/2023 |
1.1.11 | 208 | 10/2/2023 |
1.1.10 | 251 | 9/25/2023 |
1.1.9 | 236 | 9/14/2023 |
1.1.8 | 173 | 6/28/2023 |
1.1.7 | 292 | 2/16/2023 |
1.1.6 | 241 | 2/13/2023 |
1.1.5 | 229 | 2/7/2023 |
1.1.4 | 273 | 2/2/2023 |
1.1.2 | 309 | 12/13/2022 |
1.1.1 | 288 | 12/13/2022 |
1.1.0 | 313 | 12/13/2022 |
1.0.24 | 301 | 12/13/2022 |
1.0.23 | 296 | 12/12/2022 |
1.0.22 | 299 | 12/9/2022 |
1.0.21 | 424 | 8/5/2022 |
1.0.21-pr0026-0011 | 271 | 8/4/2022 |
1.0.20 | 428 | 8/3/2022 |
1.0.20-pr0024-0004 | 260 | 8/2/2022 |
1.0.20-ci-message-selec0003 | 299 | 8/2/2022 |
1.0.19 | 495 | 1/24/2022 |