Dive into AWS Real-Time Data: Your First Concrete Step Today

AWS Real-Time Data: The Essentials in One Article — Real Code, Diagrams and Concrete Steps, Excerpts from a 14-Lesson Course.

Dive into AWS Real-Time Data: Your First Concrete Step Today

The best way to learn AWS Real-Time Data is by doing. This article gives you a head start with practical excerpts from a 14-lesson course — enough to get your first results today.

tl;dr
  • Introduction to AWS Data Streaming
  • Amazon Kinesis
  • Amazon OpenSearch
  • Security and Encryption
  • Kafka vs Kinesis and Alternatives
~$ cat ./parcours.md # AWS Real-Time Data — 6 chapters
01
Introduction to AWS Data Streaming
→ Chapter 00 – Lesson 1 : Real-Time Data vs Batch Processing (Batch)→ Chapter 00 – Lesson 2 : The AWS Ecosystem for Data Streaming
02
Amazon Kinesis
→ Chapter 01 – Lesson 1 : Amazon Kinesis Data Streams — Shards and Consumers→ Chapter 01 – Lesson 2 : Kinesis Firehose — Managed Delivery and Transformation+ 1 more lessons
03
Amazon OpenSearch
→ Chapter 02 – Lesson 1 : Amazon OpenSearch — Indexing and Real-Time Search→ Chapter 02 – Lesson 2 : Amazon Cognito — Authentication for OpenSearch Dashboards+ 1 more lessons
04
Security and Encryption
→ Chapter 03 – Lesson 1 : AWS KMS — Data Encryption in Kinesis and Firehose→ Chapter 03 – Lesson 2 : IAM, VPC and Security Best Practices for Streaming
05
Kafka vs Kinesis and Alternatives
→ Chapter 04 – Lesson 1 : Apache Kafka vs Amazon Kinesis — Complete Comparison→ Chapter 04 – Lesson 2 : Kafka Migration to Kinesis / MSK and Hybrid Patterns
06
Reference Architectures
→ Chapter 05 – Lesson 1 : Reference Architecture — Medical IoT Wearables→ Chapter 05 – Lesson 2 : Reference Architecture — Real-Time E-Commerce Fraud Detection
🏁
Final project
→ You leave with a concrete and demonstrable project

Chapter 01 – Lesson 1: Amazon Kinesis Data Streams — Shards and Consumers

NOTEObjective — Understand the internal architecture of Kinesis Data Streams: how shards enable parallelism, how producers publish data, and how consumers process it — with and without Enhanced Fan-Out.

1. Shard Architecture — The Basic Unit

NOTEShard — A shard is an isolated capacity unit within a Kinesis stream. Each shard can ingest up to 1 MB/s or 1,000 records/s on input and deliver up to 2 MB/s on output (per consumer).
output
# Capacity per shard
# ─────────────────────────────────────
# Write (PUT):  1 MB/s  OR  1 000 records/s  (the lower limit applies)
# Read  (GET):  2 MB/s  per GetRecords() - shared among ALL consumers
#
# → For 5 MB/s ingestion: you need 5 shards
# → For 10 000 records/s    : you need 10 shards

# Calculate the number of shards required
max_ingestion_rate = 5    # MB/s
max_record_rate    = 3000 # records/s

shards_for_volume = max_ingestion_rate / 1    # = 5 shards
shards_for_rate   = max_record_rate / 1000    # = 3 shards

nb_shards = max(shards_for_volume, shards_for_rate)
print(f"Shards required: {int(nb_shards)}")  # → 5 shards

Partition Key — How Data Is Distributed

NOTEPartition Key — Each record has a partition key. Kinesis applies an MD5 hash function to this key to determine which shard the record is routed to. Records with the same partition key always go to the same shard (order guaranteed).
output
# Examples of Partition Keys and their impact

# ✅ GOOD: high-cardinality key → uniform distribution across shards
# device_id = "WATCH-001", "WATCH-002", ..., "WATCH-10000"
kinesis.put_record(
    StreamName='donnees-sante',
    Data=json.dumps(payload),
    PartitionKey=payload['device_id']   # Thousands of unique values
)

# ⚠️ PROBLEM: low-cardinality key → "hot shard" (one overloaded shard)
# If all records use the same key:
kinesis.put_record(
    StreamName='donnees-sante',
    Data=json.dumps(payload),
    PartitionKey='donnees-sante'   # ← Always the same shard! Bottleneck
)

# ✅ SOLUTION if no natural key: random key
import uuid
kinesis.put_record(
    StreamName='donnees-sante',
    Data=json.dumps(payload),
    PartitionKey=str(uuid.uuid4())   # Random = perfect distribution
    # Warning: loss of ordering within the shard!
)

2. Producers — Sending Data

put_record vs put_records

output
import boto3, json, time

kinesis = boto3.client('kinesis', region_name='ca-central-1')

# Method 1: put_record — single record
# Latency: ~70ms per call
donnee = {
    "device_id": "WATCH-001",
    "heart_rate": 72,
    "timestamp": int(time.time())
}
response = kinesis.put_record(
    StreamName='donnees-sante',
    Data=json.dumps(donnee).encode('utf-8'),
    PartitionKey=donnee['device_id']
)
print(f"ShardId: {response['ShardId']}")

# ─────────────────────────────────────

# Method 2: put_records — multiple records in batch (RECOMMENDED)
# Latency: ~70ms for up to 500 records!
# Limit: max 500 records or 5MB per call
donnees = [
    {"device_id": f"WATCH-{i:03d}", "heart_rate": 60 + i, "timestamp": int(time.time())}
    for i in range(10)
]

records = [
    {
        'Data': json.dumps(d).encode('utf-8'),
        'PartitionKey': d['device_id']
    }
    for d in donnees
]

response = kinesis.put_records(
    StreamName='donnees-sante',
    Records=records
)

print(f"Records OK: {response['Records'].__len__()} sent")
print(f"Failures: {response['FailedRecordCount']}")

# Handle partial failures (put_records can have partial successes)
failed_records = [
    records[i] for i, r in enumerate(response['Records'])
    if 'ErrorCode' in r
]
if failed_records:
    print(f"Retry needed for {len(failed_records)} records")

3. Consumers — Reading Data

Standard Mode (GetRecords)

NOTEGetRecords Standard — All consumers share the 2 MB/s per shard limit. If you have 3 consumers on 1 shard, each gets only ~667 KB/s.
output
# Standard consumer with GetRecords
import boto3, json, time

kinesis = boto3.client('kinesis', region_name='ca-central-1')
STREAM_NAME = 'donnees-sante'

# 1. List shards
response = kinesis.describe_stream(StreamName=STREAM_NAME)
shards = response['StreamDescription']['Shards']

for shard in shards:
    shard_id = shard['ShardId']

    # 2. Get a shard iterator (LATEST = new data only)
    iterator_response = kinesis.get_shard_iterator(
        StreamName=STREAM_NAME,
        ShardId=shard_id,
        ShardIteratorType='LATEST'  # Or 'TRIM_HORIZON' to read from the beginning
    )
    shard_iterator = iterator_response['ShardIterator']

    # 3. Read loop
    while True:
        records_response = kinesis.get_records(
            ShardIterator=shard_iterator,
            Limit=100  # Max 100 records per call (or 10MB)
        )

        for record in records_response['Records']:
            data = json.loads(record['Data'].decode('utf-8'))
            print(f"Device: {data['device_id']}, HR: {data['heart_rate']}")

        shard_iterator = records_response['NextShardIterator']

        # Limitation: max 5 GetRecords calls/s per shard
        if not records_response['Records']:
            time.sleep(1)  # Wait if no new data

Enhanced Fan-Out — Dedicated Consumers

TIPEnhanced Fan-Out — Each registered consumer gets its own 2 MB/s per shard capacity (instead of sharing). Uses HTTP/2 push instead of polling. Ideal for critical applications requiring low latency (<70ms).
bash
# Register an Enhanced Fan-Out consumer
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante \
    --consumer-name alertes-medicales-temps-reel

# List registered consumers
aws kinesis list-stream-consumers \
    --stream-arn arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante
output
# Consumer with Enhanced Fan-Out (SubscribeToShard)
import boto3, json

kinesis = boto3.client('kinesis', region_name='ca-central-1')

STREAM_ARN     = 'arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante'
CONSUMER_ARN   = 'arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante/consumer/alertes-medicales:1234567890'
SHARD_ID       = 'shardId-000000000000'

# SubscribeToShard with HTTP/2 streaming (push instead of polling)
response = kinesis.subscribe_to_shard(
    ConsumerARN=CONSUMER_ARN,
    ShardId=SHARD_ID,
    StartingPosition={'Type': 'LATEST'}
)

# Process the real-time event stream
event_stream = response['EventStream']
for event in event_stream:
    if 'SubscribeToShardEvent' in event:
        for record in event['SubscribeToShardEvent']['Records']:
            data = json.loads(record['Data'].decode('utf-8'))
            print(f"[Enhanced Fan-Out] Device: {data['device_id']}, HR: {data['heart_rate']}")

Chapter 02 – Lesson 1: Amazon OpenSearch — Real-Time Indexing and Search

NOTEObjective — Understand how Amazon OpenSearch Service works: how to create indexes, define mappings, and perform real-time searches and aggregations on streaming data.

1. OpenSearch — Core Concepts

NOTEOpenSearch — Open-source fork of Elasticsearch (developed by AWS since 2021). Distributed search and analytics engine based on Apache Lucene. Amazon OpenSearch Service is the managed version on AWS.

Terminology

Index≈ Database table
Document≈ Row in a table (JSON)
Field≈ Column in a table
ShardFragment of an index (distribution)
ReplicaCopy of a shard (high availability)
MappingSchema of field types

When to Use OpenSearch

2. Create an OpenSearch Domain

bash
# Create an OpenSearch domain for IoT data
aws opensearch create-domain \
    --domain-name sante-iot-dashboard \
    --engine-version 'OpenSearch_2.11' \
    --cluster-config '{
        "InstanceType": "t3.small.search",
        "InstanceCount": 2,
        "DedicatedMasterEnabled": false,
        "ZoneAwarenessEnabled": true,
        "ZoneAwarenessConfig": {"AvailabilityZoneCount": 2}
    }' \
    --ebs-options '{
        "EBSEnabled": true,
        "VolumeType": "gp3",
        "VolumeSize": 20,
        "Iops": 3000
    }' \
    --encryption-at-rest-options '{"Enabled": true}' \
    --node-to-node-encryption-options '{"Enabled": true}' \
    --domain-endpoint-options '{"EnforceHTTPS": true, "TLSSecurityPolicy": "Policy-Min-TLS-1-2-2019-07"}' \
    --advanced-security-options '{
        "Enabled": true,
        "InternalUserDatabaseEnabled": true,
        "MasterUserOptions": {
            "MasterUserName": "admin",
            "MasterUserPassword": "VotreMotDePasseSecurise#123"
        }
    }'

# Check domain status
aws opensearch describe-domain \
    --domain-name sante-iot-dashboard \
    --query "DomainStatus.Processing"

3. Create an Index with Mapping

NOTEMapping — The mapping defines the data types of each field. OpenSearch can auto-detect types (dynamic mapping), but it is recommended to explicitly define the mapping for critical fields to avoid typing errors.
output
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

# Connect to the OpenSearch domain
region   = 'ca-central-1'
service  = 'es'
host     = 'https://votre-domaine.ca-central-1.es.amazonaws.com'

credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
    credentials.access_key,
    credentials.secret_key,
    region, service,
    session_token=credentials.token
)

client = OpenSearch(
    hosts=[{'host': host.replace('https://', ''), 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

# Create the index with an explicit mapping
mapping = {
    "settings": {
        "number_of_shards":   2,    # Distribute across 2 nodes
        "number_of_replicas": 1     # 1 copy for HA
    },
    "mappings": {
        "properties": {
            "device_id":   {"type": "keyword"},     # Exact value (not analyzed)
            "patient_id":  {"type": "keyword"},
            "timestamp":   {"type": "date", "format": "epoch_second"},
            "heart_rate":  {"type": "integer"},
            "spo2":        {"type": "float"},
            "temperature": {"type": "float"},
            "steps":       {"type": "integer"},
            "severite":    {"type": "keyword"},
            "region":      {"type": "keyword"},
            "processed_at":{"type": "date"},
            # Analyzed text field for full-text search
            "notes":       {"type": "text", "analyzer": "french"}
        }
    }
}

response = client.indices.create(index='donnees-cardiaques', body=mapping)
print(f"Index created: {response['acknowledged']}")

# Verify the mapping
mapping_info = client.indices.get_mapping(index='donnees-cardiaques')
print(f"Mapping: {list(mapping_info['donnees-cardiaques']['mappings']['properties'].keys())}")

4. Index Documents

output
import time, json, random

# Index a single document
document = {
    "device_id":   "WATCH-042",
    "patient_id":  "PATIENT-42",
    "timestamp":   int(time.time()),
    "heart_rate":  88,
    "spo2":        97.5,
    "temperature": 37.1,
    "steps":       12453,
    "severite":    "NORMAL",
    "region":      "ca-central-1"
}

response = client.index(
    index='donnees-cardiaques',
    body=document,
    id=f"{document['device_id']}-{document['timestamp']}",  # Unique ID
    refresh=True  # Make the document immediately searchable
)
print(f"Document indexed: {response['result']}")

# Bulk indexing (more efficient for large volumes)
from opensearchpy.helpers import bulk

def generer_donnees(n_montres=100):
    """Generate n cardiac watch records"""
    actions = []
    for i in range(n_montres):
        heart_rate = random.randint(55, 115)
        spo2       = random.uniform(93, 100)

        doc = {
            "_index":    "donnees-cardiaques",
            "_id":       f"WATCH-{i:03d}-{int(time.time())}",
            "_source": {
                "device_id":   f"WATCH-{i:03d}",
                "patient_id":  f"PATIENT-{i}",
                "timestamp":   int(time.time()),
                "heart_rate":  heart_rate,
                "spo2":        round(spo2, 1),
                "temperature": round(random.uniform(36.0, 37.8), 1),
                "steps":       random.randint(0, 20000),
                "severite":    "CRITIQUE" if heart_rate > 140 else "ATTENTION" if heart_rate > 110 else "NORMAL",
                "region":      "ca-central-1"
            }
        }
        actions.append(doc)
    return actions

# Bulk indexing
success, errors = bulk(client, generer_donnees(100))
print(f"Indexed: {success} documents, {len(errors)} errors")

5. Searches and Aggregations

Simple Search (Match Query)

output
# Find all CRITIQUE records
query_critiques = {
    "query": {
        "term": {"severite": "CRITIQUE"}
    },
    "sort": [{"timestamp": {"order": "desc"}}],
    "size": 20
}

response = client.search(index='donnees-cardiaques', body=query_critiques)
print(f"Critical records: {response['hits']['total']['value']}")
for hit in response['hits']['hits']:
    print(f"  Device: {hit['_source']['device_id']}, HR: {hit['_source']['heart_rate']}")

Search with Range Filter (Range Query)

output
# Find cardiac anomalies from the last 30 minutes
now = int(time.time())
thirty_min_ago = now - 1800

query_anomalies = {
    "query": {
        "bool": {
            "must": [
                {
                    "range": {
                        "timestamp": {
                            "gte": thirty_min_ago,
                            "lte": now
                        }
                    }
                }
            ],
            "should": [
                {"range": {"heart_rate": {"gt": 120}}},    # Tachycardia
                {"range": {"heart_rate": {"lt": 45}}},     # Bradycardia
                {"range": {"spo2": {"lt": 90}}}            # Hypoxemia
            ],
            "minimum_should_match": 1
        }
    },
    "sort": [{"timestamp": {"order": "desc"}}]
}

response = client.search(index='donnees-cardiaques', body=query_anomalies)
print(f"Anomalies detected: {response['hits']['total']['value']}")

Aggregations — Real-Time Statistics

output
# Aggregation: cardiac statistics per device (last 24h)
query_stats = {
    "query": {
        "range": {
            "timestamp": {"gte": "now-24h/h", "lte": "now"}
        }
    },
    "aggs": {
        "par_device": {
            "terms": {"field": "device_id", "size": 50},
            "aggs": {
                "fc_moyenne":  {"avg": {"field": "heart_rate"}},
                "fc_max":      {"max": {"field": "heart_rate"}},
                "fc_min":      {"min": {"field": "heart_rate"}},
                "spo2_moyenne":{"avg": {"field": "spo2"}},
                "nb_critiques":{
                    "filter": {"term": {"severite": "CRITIQUE"}}
                }
            }
        },
        "par_severite": {
            "terms": {"field": "severite"},
            "aggs": {
                "nb_patients": {"cardinality": {"field": "patient_id"}}
            }
        },
        "tendance_horaire": {
            "date_histogram": {
                "field":            "timestamp",
                "fixed_interval":   "1h",
                "format":           "yyyy-MM-dd HH:mm"
            },
            "aggs": {
                "fc_moyenne": {"avg": {"field": "heart_rate"}}
            }
        }
    },
    "size": 0   # Return only aggregations, not documents
}

response = client.search(index='donnees-cardiaques', body=query_stats)
buckets = response['aggregations']['par_severite']['buckets']
for b in buckets:
    print(f"Severity {b['key']}: {b['doc_count']} readings, {b['nb_patients']['value']} patients")

Chapter 02 – Lesson 2: Amazon Cognito — Authentication for OpenSearch Dashboards

NOTEObjective — Configure Amazon Cognito to secure access to OpenSearch Dashboards, create distinct roles for different user types (administrators, data scientists, monitoring operators), and apply fine-grained access control to indexes.

1. Why Cognito for OpenSearch?

NOTEProblem — By default, OpenSearch Dashboards uses basic HTTP authentication (username/password). This is not suitable for an organization with dozens of users, different profiles, and SSO authentication needs.

Without Cognito

With Cognito

2. Cognito + OpenSearch Architecture

output
# Authentication architecture

# USER
#     ↓  (1) Accesses OpenSearch Dashboards
# AMAZON COGNITO USER POOL
#     ↓  (2) Verifies credentials (or SSO via IdP)
#     ↓  (3) Issues a JWT token
# AMAZON COGNITO IDENTITY POOL
#     ↓  (4) Exchanges the JWT for temporary AWS credentials (STS)
#     ↓  (5) Assigns an IAM role based on the Cognito group
# AMAZON OPENSEARCH SERVICE
#     ↓  (6) Checks Fine-Grained Access Control (FGAC) permissions
# INDEX DATA

3. Create the Cognito User Pool

bash
# Step 1: Create the User Pool
aws cognito-idp create-user-pool \
    --pool-name opensearch-sante-users \
    --policies '{
        "PasswordPolicy": {
            "MinimumLength": 12,
            "RequireUppercase": true,
            "RequireLowercase": true,
            "RequireNumbers": true,
            "RequireSymbols": true
        }
    }' \
    --auto-verified-attributes email \
    --mfa-configuration OPTIONAL \
    --sms-configuration '{"SnsCallerArn": "arn:aws:iam::123456789012:role/CognitoSNSRole", "ExternalId": "extId"}' \
    --query "UserPool.Id" --output text
# → us-east-1_XXXXXXXXX (note this ID)

# Step 2: Create the App Client (no secret for Dashboards)
aws cognito-idp create-user-pool-client \
    --user-pool-id us-east-1_XXXXXXXXX \
    --client-name opensearch-dashboards-client \
    --no-generate-secret \
    --explicit-auth-flows ALLOW_USER_PASSWORD_AUTH ALLOW_REFRESH_TOKEN_AUTH \
    --supported-identity-providers COGNITO \
    --query "UserPoolClient.ClientId" --output text
# → YYYYYYYYYYYYYYYYYY (note this ID)

Create User Groups

bash
# Group 1: OpenSearch Administrators (full access)
aws cognito-idp create-group \
    --user-pool-id us-east-1_XXXXXXXXX \
    --group-name opensearch-admins \
    --description "Full administrator access to OpenSearch" \
    --role-arn arn:aws:iam::123456789012:role/OpenSearch-AdminRole

# Group 2: Data Scientists (read-only on all indexes)
aws cognito-idp create-group \
    --user-pool-id us-east-1_XXXXXXXXX \
    --group-name opensearch-scientists \
    --description "Read-only access for analysis" \
    --role-arn arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole

# Group 3: Monitoring Operators (read access on the monitoring index only)
aws cognito-idp create-group \
    --user-pool-id us-east-1_XXXXXXXXX \
    --group-name opensearch-operators \
    --description "Operational monitoring only" \
    --role-arn arn:aws:iam::123456789012:role/OpenSearch-OperatorRole

# Create a user and add them to a group
aws cognito-idp admin-create-user \
    --user-pool-id us-east-1_XXXXXXXXX \
    --username dr.dupont@hopital.ca \
    --user-attributes Name=email,Value=dr.dupont@hopital.ca Name=email_verified,Value=true \
    --temporary-password TempPassword123!

aws cognito-idp admin-add-user-to-group \
    --user-pool-id us-east-1_XXXXXXXXX \
    --username dr.dupont@hopital.ca \
    --group-name opensearch-scientists

4. Create the Identity Pool

bash
# Create the Identity Pool (links the User Pool to IAM roles)
aws cognito-identity create-identity-pool \
    --identity-pool-name opensearch-sante-identity \
    --allow-unauthenticated-identities false \
    --cognito-identity-providers '[
        {
            "ProviderName": "cognito-idp.ca-central-1.amazonaws.com/ca-central-1_XXXXXXXXX",
            "ClientId": "YYYYYYYYYYYYYYYYYY",
            "ServerSideTokenCheck": true
        }
    ]' \
    --query "IdentityPoolId" --output text
# → ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ

# Associate IAM roles with the Identity Pool
aws cognito-identity set-identity-pool-roles \
    --identity-pool-id "ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ" \
    --roles authenticated=arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole \
    --role-mappings '{
        "cognito-idp.ca-central-1.amazonaws.com/ca-central-1_XXXXXXXXX:YYYYYYYYYYYYYYYYYY": {
            "Type": "Rules",
            "AmbiguousRoleResolution": "AuthenticatedRole",
            "RulesConfiguration": {
                "Rules": [
                    {
                        "Claim": "cognito:groups",
                        "MatchType": "Contains",
                        "Value": "opensearch-admins",
                        "RoleARN": "arn:aws:iam::123456789012:role/OpenSearch-AdminRole"
                    },
                    {
                        "Claim": "cognito:groups",
                        "MatchType": "Contains",
                        "Value": "opensearch-operators",
                        "RoleARN": "arn:aws:iam::123456789012:role/OpenSearch-OperatorRole"
                    }
                ]
            }
        }
    }'

5. Fine-Grained Access Control in OpenSearch

NOTEFine-Grained Access Control (FGAC) — OpenSearch allows access control at the index, document type, and even individual field level. This enables creation of custom roles that expose only the necessary data.
output
# Configure OpenSearch roles via the REST API

import requests, json

host  = 'https://votre-domaine.ca-central-1.es.amazonaws.com'
auth  = ('admin', 'VotreMotDePasseSecurise#123')

# Role 1: Read-only on the donnees-cardiaques index
roles_payload = {
    "cluster_permissions": ["cluster:monitor/main"],
    "index_permissions": [
        {
            "index_patterns": ["donnees-cardiaques*"],
            "allowed_actions": [
                "read",
                "indices:data/read/search",
                "indices:data/read/msearch",
                "indices:admin/mappings/get"
            ]
        }
    ],
    "tenant_permissions": [
        {
            "tenant_patterns": ["global_tenant"],
            "allowed_actions": ["kibana_all_read"]
        }
    ]
}

# Create the "data-scientist-reader" role
requests.put(
    f"{host}/_plugins/_security/api/roles/data-scientist-reader",
    json=roles_payload, auth=auth
)

# Map the Cognito IAM role to the OpenSearch role
role_mapping = {
    "backend_roles": [
        "arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole"
    ],
    "description": "Data Scientists have read access to cardiac data"
}

requests.put(
    f"{host}/_plugins/_security/api/rolesmapping/data-scientist-reader",
    json=role_mapping, auth=auth
)

print("Role and mapping created successfully")

6. Enable Cognito in OpenSearch

bash
# Enable Cognito authentication on the OpenSearch domain
aws opensearch update-domain-config \
    --domain-name sante-iot-dashboard \
    --cognito-options '{
        "Enabled": true,
        "UserPoolId": "ca-central-1_XXXXXXXXX",
        "IdentityPoolId": "ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ",
        "RoleArn": "arn:aws:iam::123456789012:role/CognitoAccessForAmazonOpenSearch"
    }'

# Wait for the update to complete
aws opensearch describe-domain \
    --domain-name sante-iot-dashboard \
    --query "DomainStatus.Processing"
# Wait until the value is "false"

# Retrieve the Dashboards URL
aws opensearch describe-domain \
    --domain-name sante-iot-dashboard \
    --query "DomainStatus.Endpoints"
go-further

This article covers the most useful excerpts — the full AWS Real-Time Data course (6 chapters, 14 lessons, corrected exercises and final project) takes you all the way.

./access-the-full-course free course: Mastering Claude Code

FAQ

How long does it take to learn AWS Real-Time Data?
With a structured progression (6 chapters, 14 short practical lessons), you reach an operational level in a few weeks at 30 to 60 minutes per day. The key is to practice each concept immediately.
Are there any prerequisites?
Basic computer knowledge is sufficient. If you can use a terminal and read simple code, you are ready.
Where to start concretely?
Reproduce the commands in this article, then follow the full AWS Real-Time Data course: it chains the 14 lessons in order, with exercises and a final project.

📬 Want to receive this type of guide every week? Subscribe for free — real code, zero fluff.