azure-storage-queue-py
Cloud, DevOps & Systèmes|
Documentation
Azure Queue Storage SDK for Python
Simple, cost-effective message queuing for asynchronous communication.
Installation
pip install azure-storage-queue azure-identityEnvironment Variables
AZURE_STORAGE_ACCOUNT_URL=https://<account>.queue.core.windows.netAuthentication
from azure.identity import DefaultAzureCredential
from azure.storage.queue import QueueServiceClient, QueueClient
credential = DefaultAzureCredential()
account_url = "https://<account>.queue.core.windows.net"
# Service client
service_client = QueueServiceClient(account_url=account_url, credential=credential)
# Queue client
queue_client = QueueClient(account_url=account_url, queue_name="myqueue", credential=credential)Queue Operations
# Create queue
service_client.create_queue("myqueue")
# Get queue client
queue_client = service_client.get_queue_client("myqueue")
# Delete queue
service_client.delete_queue("myqueue")
# List queues
for queue in service_client.list_queues():
print(queue.name)Send Messages
# Send message (string)
queue_client.send_message("Hello, Queue!")
# Send with options
queue_client.send_message(
content="Delayed message",
visibility_timeout=60, # Hidden for 60 seconds
time_to_live=3600 # Expires in 1 hour
)
# Send JSON
import json
data = {"task": "process", "id": 123}
queue_client.send_message(json.dumps(data))Receive Messages
# Receive messages (makes them invisible temporarily)
messages = queue_client.receive_messages(
messages_per_page=10,
visibility_timeout=30 # 30 seconds to process
)
for message in messages:
print(f"ID: {message.id}")
print(f"Content: {message.content}")
print(f"Dequeue count: {message.dequeue_count}")
# Process message...
# Delete after processing
queue_client.delete_message(message)Peek Messages
# Peek without hiding (doesn't affect visibility)
messages = queue_client.peek_messages(max_messages=5)
for message in messages:
print(message.content)Update Message
# Extend visibility or update content
messages = queue_client.receive_messages()
for message in messages:
# Extend timeout (need more time)
queue_client.update_message(
message,
visibility_timeout=60
)
# Update content and timeout
queue_client.update_message(
message,
content="Updated content",
visibility_timeout=60
)Delete Message
# Delete after successful processing
messages = queue_client.receive_messages()
for message in messages:
try:
# Process...
queue_client.delete_message(message)
except Exception:
# Message becomes visible again after timeout
passClear Queue
# Delete all messages
queue_client.clear_messages()Queue Properties
# Get queue properties
properties = queue_client.get_queue_properties()
print(f"Approximate message count: {properties.approximate_message_count}")
# Set/get metadata
queue_client.set_queue_metadata(metadata={"environment": "production"})
properties = queue_client.get_queue_properties()
print(properties.metadata)Async Client
from azure.storage.queue.aio import QueueServiceClient, QueueClient
from azure.identity.aio import DefaultAzureCredential
async def queue_operations():
credential = DefaultAzureCredential()
async with QueueClient(
account_url="https://<account>.queue.core.windows.net",
queue_name="myqueue",
credential=credential
) as client:
# Send
await client.send_message("Async message")
# Receive
async for message in client.receive_messages():
print(message.content)
await client.delete_message(message)
import asyncio
asyncio.run(queue_operations())Base64 Encoding
from azure.storage.queue import QueueClient, BinaryBase64EncodePolicy, BinaryBase64DecodePolicy
# For binary data
queue_client = QueueClient(
account_url=account_url,
queue_name="myqueue",
credential=credential,
message_encode_policy=BinaryBase64EncodePolicy(),
message_decode_policy=BinaryBase64DecodePolicy()
)
# Send bytes
queue_client.send_message(b"Binary content")Best Practices
dequeue_count for poison message detectionpeek_messages for monitoring without affecting queuetime_to_live to prevent stale messagesCompétences similaires
Explorez d'autres agents de la catégorie Cloud, DevOps & Systèmes
service-mesh-observability
Implement comprehensive observability for service meshes including distributed tracing, metrics, and visualization. Use when setting up mesh monitoring, debugging latency issues, or implementing SLOs for service communication.
azure-mgmt-mongodbatlas-dotnet
Manage MongoDB Atlas Organizations as Azure ARM resources using Azure.ResourceManager.MongoDBAtlas SDK. Use when creating, updating, listing, or deleting MongoDB Atlas organizations through Azure Marketplace integration. This SDK manages the Azure-side organization resource, not Atlas clusters/databases directly.
azure-search-documents-ts
Build search applications using Azure AI Search SDK for JavaScript (@azure/search-documents). Use when creating/managing indexes, implementing vector/hybrid search, semantic ranking, or building agentic retrieval with knowledge bases.