azure-eventhub-ts
Cloud, DevOps & SystèmesBuild event streaming applications using Azure Event Hubs SDK for JavaScript (@azure/event-hubs). Use when implementing high-throughput event ingestion, real-time analytics, IoT telemetry, or event-driven architectures with partitioned consumers.
Documentation
Azure Event Hubs SDK for TypeScript
High-throughput event streaming and real-time data ingestion.
Installation
npm install @azure/event-hubs @azure/identityFor checkpointing with consumer groups:
npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blobEnvironment Variables
EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=my-eventhub
STORAGE_ACCOUNT_NAME=<storage-account>
STORAGE_CONTAINER_NAME=checkpointsAuthentication
import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";
const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!;
const eventHubName = process.env.EVENTHUB_NAME!;
const credential = new DefaultAzureCredential();
// Producer
const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);
// Consumer
const consumer = new EventHubConsumerClient(
"$Default", // Consumer group
fullyQualifiedNamespace,
eventHubName,
credential
);Core Workflow
Send Events
const producer = new EventHubProducerClient(namespace, eventHubName, credential);
// Create batch and add events
const batch = await producer.createBatch();
batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } });
batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } });
await producer.sendBatch(batch);
await producer.close();Send to Specific Partition
// By partition ID
const batch = await producer.createBatch({ partitionId: "0" });
// By partition key (consistent hashing)
const batch = await producer.createBatch({ partitionKey: "device-123" });Receive Events (Simple)
const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}`);
}
},
processError: async (err, context) => {
console.error(`Error on partition ${context.partitionId}: ${err.message}`);
},
});
// Stop after some time
setTimeout(async () => {
await subscription.close();
await consumer.close();
}, 60000);Receive with Checkpointing (Production)
import { EventHubConsumerClient } from "@azure/event-hubs";
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
const containerClient = new ContainerClient(
`https://${storageAccount}.blob.core.windows.net/${containerName}`,
credential
);
const checkpointStore = new BlobCheckpointStore(containerClient);
const consumer = new EventHubConsumerClient(
"$Default",
namespace,
eventHubName,
credential,
checkpointStore
);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Processing: ${JSON.stringify(event.body)}`);
}
// Checkpoint after processing batch
if (events.length > 0) {
await context.updateCheckpoint(events[events.length - 1]);
}
},
processError: async (err, context) => {
console.error(`Error: ${err.message}`);
},
});Receive from Specific Position
const subscription = consumer.subscribe({
processEvents: async (events, context) => { /* ... */ },
processError: async (err, context) => { /* ... */ },
}, {
startPosition: {
// Start from beginning
"0": { offset: "@earliest" },
// Start from end (new events only)
"1": { offset: "@latest" },
// Start from specific offset
"2": { offset: "12345" },
// Start from specific time
"3": { enqueuedOn: new Date("2024-01-01") },
},
});Event Hub Properties
// Get hub info
const hubProperties = await producer.getEventHubProperties();
console.log(`Partitions: ${hubProperties.partitionIds}`);
// Get partition info
const partitionProperties = await producer.getPartitionProperties("0");
console.log(`Last sequence: ${partitionProperties.lastEnqueuedSequenceNumber}`);Batch Processing Options
const subscription = consumer.subscribe(
{
processEvents: async (events, context) => { /* ... */ },
processError: async (err, context) => { /* ... */ },
},
{
maxBatchSize: 100, // Max events per batch
maxWaitTimeInSeconds: 30, // Max wait for batch
}
);Key Types
import {
EventHubProducerClient,
EventHubConsumerClient,
EventData,
ReceivedEventData,
PartitionContext,
Subscription,
SubscriptionEventHandlers,
CreateBatchOptions,
EventPosition,
} from "@azure/event-hubs";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";Event Properties
// Send with properties
const batch = await producer.createBatch();
batch.tryAdd({
body: { data: "payload" },
properties: {
eventType: "telemetry",
deviceId: "sensor-1",
},
contentType: "application/json",
correlationId: "request-123",
});
// Access in receiver
consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Type: ${event.properties?.eventType}`);
console.log(`Sequence: ${event.sequenceNumber}`);
console.log(`Enqueued: ${event.enqueuedTimeUtc}`);
console.log(`Offset: ${event.offset}`);
}
},
});Error Handling
consumer.subscribe({
processEvents: async (events, context) => {
try {
for (const event of events) {
await processEvent(event);
}
await context.updateCheckpoint(events[events.length - 1]);
} catch (error) {
// Don't checkpoint on error - events will be reprocessed
console.error("Processing failed:", error);
}
},
processError: async (err, context) => {
if (err.name === "MessagingError") {
// Transient error - SDK will retry
console.warn("Transient error:", err.message);
} else {
// Fatal error
console.error("Fatal error:", err);
}
},
});Best Practices
1.Use checkpointing - Always checkpoint in production for exactly-once processing
2.Batch sends - Use
createBatch() for efficient sending3.Partition keys - Use partition keys to ensure ordering for related events
4.Consumer groups - Use separate consumer groups for different processing pipelines
5.Handle errors gracefully - Don't checkpoint on processing failures
6.Close clients - Always close producer/consumer when done
7.Monitor lag - Track
lastEnqueuedSequenceNumber vs processed sequenceCompétences similaires
Explorez d'autres agents de la catégorie Cloud, DevOps & Systèmes