Lance-toi en AWS Données Temps Réel : ton premier pas concret aujourd'hui

AWS Données Temps Réel : l'essentiel en un article — vrai code, schémas et étapes concrètes, extraits d'un cours de 14 leçons.

Lance-toi en AWS Données Temps Réel : ton premier pas concret aujourd'hui

La meilleure façon d'apprendre AWS Données Temps Réel, c'est de faire. Cet article te met le pied à l'étrier avec des extraits pratiques tirés d'un cours de 14 leçons — de quoi obtenir un premier résultat dès aujourd'hui.

tl;dr
  • Introduction au Streaming de Donnees AWS
  • Amazon Kinesis
  • Amazon OpenSearch
  • Securite et Encryption
  • Kafka vs Kinesis et Alternatives
~$ cat ./parcours.md # AWS Données Temps Réel — 6 chapitres
01
Introduction au Streaming de Données AWS
→ Chapitre 00 – Leçon 1 : Données en Temps Réel vs Traitement par Lots (Batch)→ Chapitre 00 – Leçon 2 : L'Écosystème AWS pour le Streaming de Données
02
Amazon Kinesis
→ Chapitre 01 – Leçon 1 : Amazon Kinesis Data Streams — Shards et Consommateurs→ Chapitre 01 – Leçon 2 : Kinesis Firehose — Livraison Managée et Transformation+ 1 autres leçons
03
Amazon OpenSearch
→ Chapitre 02 – Leçon 1 : Amazon OpenSearch — Indexation et Recherche en Temps Réel→ Chapitre 02 – Leçon 2 : Amazon Cognito — Authentification pour OpenSearch Dashboards+ 1 autres leçons
04
Sécurité et Encryption
→ Chapitre 03 – Leçon 1 : AWS KMS — Chiffrement des Données dans Kinesis et Firehose→ Chapitre 03 – Leçon 2 : IAM, VPC et Bonnes Pratiques de Sécurité pour le Streaming
05
Kafka vs Kinesis et Alternatives
→ Chapitre 04 – Leçon 1 : Apache Kafka vs Amazon Kinesis — Comparaison Complète→ Chapitre 04 – Leçon 2 : Migration Kafka vers Kinesis / MSK et Patterns Hybrides
06
Architectures de Référence
→ Chapitre 05 – Leçon 1 : Architecture de Référence — IoT Wearables Médicaux→ Chapitre 05 – Leçon 2 : Architecture de Référence — Détection de Fraude E-Commerce en Temps Réel
🏁
Projet final
→ Tu repars avec un projet concret et démontrable

Chapitre 01 – Leçon 1 : Amazon Kinesis Data Streams — Shards et Consommateurs

NOTEObjectif — Comprendre l'architecture interne de Kinesis Data Streams : comment les shards permettent le parallélisme, comment les producteurs publient des données, et comment les consommateurs les traitent — avec et sans Enhanced Fan-Out.

1. Architecture des Shards — L'Unité de Base

NOTEShard — Un shard est une unité de capacité isolée dans un flux Kinesis. Chaque shard peut ingérer jusqu'à 1 MB/s ou 1 000 enregistrements/s en entrée et fournir jusqu'à 2 MB/s en sortie (par consommateur).
output
# Capacité par shard
# ─────────────────────────────────────
# Écriture (PUT) :  1 MB/s  OU  1 000 records/s  (la limite la plus basse s'applique)
# Lecture  (GET) :  2 MB/s  par GetRecords() - partagée entre TOUS les consommateurs
#
# → Pour 5 Mo/s d'ingestion : vous avez besoin de 5 shards
# → Pour 10 000 records/s    : vous avez besoin de 10 shards

# Calculer le nombre de shards nécessaires
max_ingestion_rate = 5    # MB/s
max_record_rate    = 3000 # records/s

shards_pour_volume = max_ingestion_rate / 1    # = 5 shards
shards_pour_taux   = max_record_rate / 1000    # = 3 shards

nb_shards = max(shards_pour_volume, shards_pour_taux)
print(f"Shards nécessaires : {int(nb_shards)}")  # → 5 shards

Partition Key — Comment les Données sont Distribuées

NOTEPartition Key — Chaque enregistrement possède une clé de partition. Kinesis applique une fonction de hachage MD5 sur cette clé pour déterminer dans quel shard l'enregistrement est routé. Les enregistrements avec la même clé de partition vont toujours dans le même shard (ordre garanti).
output
# Exemples de Partition Keys et leur impact

# ✅ BON : clé à haute cardinalité → distribution uniforme entre shards
# device_id = "WATCH-001", "WATCH-002", ..., "WATCH-10000"
kinesis.put_record(
    StreamName='donnees-sante',
    Data=json.dumps(payload),
    PartitionKey=payload['device_id']   # Des milliers de valeurs uniques
)

# ⚠️ PROBLÈME : clé à faible cardinalité → "hot shard" (un shard surchargé)
# Si tous les enregistrements utilisent la même clé :
kinesis.put_record(
    StreamName='donnees-sante',
    Data=json.dumps(payload),
    PartitionKey='donnees-sante'   # ← Toujours le même shard ! Goulot d'étranglement
)

# ✅ SOLUTION si pas de clé naturelle : clé aléatoire
import uuid
kinesis.put_record(
    StreamName='donnees-sante',
    Data=json.dumps(payload),
    PartitionKey=str(uuid.uuid4())   # Aléatoire = distribution parfaite
    # Attention : perte de l'ordre dans le shard !
)

2. Producteurs — Envoyer des Données

put_record vs put_records

output
import boto3, json, time

kinesis = boto3.client('kinesis', region_name='ca-central-1')

# Méthode 1 : put_record — un seul enregistrement
# Latence : ~70ms par appel
donnee = {
    "device_id": "WATCH-001",
    "heart_rate": 72,
    "timestamp": int(time.time())
}
response = kinesis.put_record(
    StreamName='donnees-sante',
    Data=json.dumps(donnee).encode('utf-8'),
    PartitionKey=donnee['device_id']
)
print(f"ShardId: {response['ShardId']}")

# ─────────────────────────────────────

# Méthode 2 : put_records — plusieurs enregistrements en batch (RECOMMANDÉ)
# Latence : ~70ms pour jusqu'à 500 enregistrements !
# Limite : max 500 enregistrements ou 5MB par appel
donnees = [
    {"device_id": f"WATCH-{i:03d}", "heart_rate": 60 + i, "timestamp": int(time.time())}
    for i in range(10)
]

records = [
    {
        'Data': json.dumps(d).encode('utf-8'),
        'PartitionKey': d['device_id']
    }
    for d in donnees
]

response = kinesis.put_records(
    StreamName='donnees-sante',
    Records=records
)

print(f"Enregistrements OK: {response['Records'].__len__()} envoyés")
print(f"Échecs: {response['FailedRecordCount']}")

# Gérer les échecs partiels (put_records peut avoir des succès partiels)
failed_records = [
    records[i] for i, r in enumerate(response['Records'])
    if 'ErrorCode' in r
]
if failed_records:
    print(f"Retry nécessaire pour {len(failed_records)} enregistrements")

3. Consommateurs — Lire les Données

Mode Standard (GetRecords)

NOTEGetRecords Standard — Tous les consommateurs partagent la limite de 2 MB/s par shard. Si vous avez 3 consommateurs sur 1 shard, chacun n'obtient que ~667 KB/s.
output
# Consommateur standard avec GetRecords
import boto3, json, time

kinesis = boto3.client('kinesis', region_name='ca-central-1')
STREAM_NAME = 'donnees-sante'

# 1. Lister les shards
response = kinesis.describe_stream(StreamName=STREAM_NAME)
shards = response['StreamDescription']['Shards']

for shard in shards:
    shard_id = shard['ShardId']

    # 2. Obtenir un itérateur de shard (LATEST = nouvelles données seulement)
    iterator_response = kinesis.get_shard_iterator(
        StreamName=STREAM_NAME,
        ShardId=shard_id,
        ShardIteratorType='LATEST'  # Ou 'TRIM_HORIZON' pour lire depuis le début
    )
    shard_iterator = iterator_response['ShardIterator']

    # 3. Boucle de lecture
    while True:
        records_response = kinesis.get_records(
            ShardIterator=shard_iterator,
            Limit=100  # Max 100 enregistrements par appel (ou 10MB)
        )

        for record in records_response['Records']:
            data = json.loads(record['Data'].decode('utf-8'))
            print(f"Device: {data['device_id']}, HR: {data['heart_rate']}")

        shard_iterator = records_response['NextShardIterator']

        # Limitation : max 5 appels GetRecords/s par shard
        if not records_response['Records']:
            time.sleep(1)  # Attendre si pas de nouvelles données

Enhanced Fan-Out — Consommateurs Dédiés

TIPEnhanced Fan-Out — Chaque consommateur enregistré obtient sa propre capacité de 2 MB/s par shard (au lieu de partager). Utilise HTTP/2 push au lieu de polling. Idéal pour les applications critiques nécessitant une faible latence (<70ms).
bash
# Enregistrer un consommateur Enhanced Fan-Out
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante \
    --consumer-name alertes-medicales-temps-reel

# Lister les consommateurs enregistrés
aws kinesis list-stream-consumers \
    --stream-arn arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante
output
# Consommateur avec Enhanced Fan-Out (SubscribeToShard)
import boto3, json

kinesis = boto3.client('kinesis', region_name='ca-central-1')

STREAM_ARN     = 'arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante'
CONSUMER_ARN   = 'arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante/consumer/alertes-medicales:1234567890'
SHARD_ID       = 'shardId-000000000000'

# SubscribeToShard avec HTTP/2 streaming (push au lieu de polling)
response = kinesis.subscribe_to_shard(
    ConsumerARN=CONSUMER_ARN,
    ShardId=SHARD_ID,
    StartingPosition={'Type': 'LATEST'}
)

# Traiter le flux d'événements en temps réel
event_stream = response['EventStream']
for event in event_stream:
    if 'SubscribeToShardEvent' in event:
        for record in event['SubscribeToShardEvent']['Records']:
            data = json.loads(record['Data'].decode('utf-8'))
            print(f"[Enhanced Fan-Out] Device: {data['device_id']}, HR: {data['heart_rate']}")

Chapitre 02 – Leçon 1 : Amazon OpenSearch — Indexation et Recherche en Temps Réel

NOTEObjectif — Comprendre le fonctionnement d'Amazon OpenSearch Service : comment créer des index, définir des mappings, et effectuer des recherches et agrégations en temps réel sur des données de streaming.

1. OpenSearch — Concepts Fondamentaux

NOTEOpenSearch — Fork open-source d'Elasticsearch (développé par AWS depuis 2021). Moteur de recherche et d'analytics distribué basé sur Apache Lucene. Amazon OpenSearch Service est la version managée sur AWS.

Terminologie

Index≈ Table d'une base de données
Document≈ Ligne dans une table (JSON)
Field≈ Colonne dans une table
ShardFragment d'un index (distribution)
ReplicaCopie d'un shard (haute disponibilité)
MappingSchéma des types de champs

Quand Utiliser OpenSearch

2. Créer un Domaine OpenSearch

bash
# Créer un domaine OpenSearch pour données IoT
aws opensearch create-domain \
    --domain-name sante-iot-dashboard \
    --engine-version 'OpenSearch_2.11' \
    --cluster-config '{
        "InstanceType": "t3.small.search",
        "InstanceCount": 2,
        "DedicatedMasterEnabled": false,
        "ZoneAwarenessEnabled": true,
        "ZoneAwarenessConfig": {"AvailabilityZoneCount": 2}
    }' \
    --ebs-options '{
        "EBSEnabled": true,
        "VolumeType": "gp3",
        "VolumeSize": 20,
        "Iops": 3000
    }' \
    --encryption-at-rest-options '{"Enabled": true}' \
    --node-to-node-encryption-options '{"Enabled": true}' \
    --domain-endpoint-options '{"EnforceHTTPS": true, "TLSSecurityPolicy": "Policy-Min-TLS-1-2-2019-07"}' \
    --advanced-security-options '{
        "Enabled": true,
        "InternalUserDatabaseEnabled": true,
        "MasterUserOptions": {
            "MasterUserName": "admin",
            "MasterUserPassword": "VotreMotDePasseSecurise#123"
        }
    }'

# Vérifier le statut du domaine
aws opensearch describe-domain \
    --domain-name sante-iot-dashboard \
    --query "DomainStatus.Processing"

3. Créer un Index avec Mapping

NOTEMapping — Le mapping définit les types de données de chaque champ. OpenSearch peut auto-détecter les types (dynamic mapping), mais il est recommandé de définir explicitement le mapping pour les champs critiques afin d'éviter les erreurs de typage.
output
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

# Connexion au domaine OpenSearch
region   = 'ca-central-1'
service  = 'es'
host     = 'https://votre-domaine.ca-central-1.es.amazonaws.com'

credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
    credentials.access_key,
    credentials.secret_key,
    region, service,
    session_token=credentials.token
)

client = OpenSearch(
    hosts=[{'host': host.replace('https://', ''), 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

# Créer l'index avec un mapping explicite
mapping = {
    "settings": {
        "number_of_shards":   2,    # Distribuer sur 2 nœuds
        "number_of_replicas": 1     # 1 copie pour la HA
    },
    "mappings": {
        "properties": {
            "device_id":   {"type": "keyword"},     # Valeur exacte (non analysée)
            "patient_id":  {"type": "keyword"},
            "timestamp":   {"type": "date", "format": "epoch_second"},
            "heart_rate":  {"type": "integer"},
            "spo2":        {"type": "float"},
            "temperature": {"type": "float"},
            "steps":       {"type": "integer"},
            "severite":    {"type": "keyword"},
            "region":      {"type": "keyword"},
            "processed_at":{"type": "date"},
            # Champ texte analysé pour la recherche full-text
            "notes":       {"type": "text", "analyzer": "french"}
        }
    }
}

response = client.indices.create(index='donnees-cardiaques', body=mapping)
print(f"Index créé : {response['acknowledged']}")

# Vérifier le mapping
mapping_info = client.indices.get_mapping(index='donnees-cardiaques')
print(f"Mapping : {list(mapping_info['donnees-cardiaques']['mappings']['properties'].keys())}")

4. Indexer des Documents

output
import time, json, random

# Indexer un document individuel
document = {
    "device_id":   "WATCH-042",
    "patient_id":  "PATIENT-42",
    "timestamp":   int(time.time()),
    "heart_rate":  88,
    "spo2":        97.5,
    "temperature": 37.1,
    "steps":       12453,
    "severite":    "NORMAL",
    "region":      "ca-central-1"
}

response = client.index(
    index='donnees-cardiaques',
    body=document,
    id=f"{document['device_id']}-{document['timestamp']}",  # ID unique
    refresh=True  # Rend le document immédiatement recherchable
)
print(f"Document indexé : {response['result']}")

# Indexation en bulk (plus efficace pour de grands volumes)
from opensearchpy.helpers import bulk

def generer_donnees(n_montres=100):
    """Générer n enregistrements de montres cardiaques"""
    actions = []
    for i in range(n_montres):
        heart_rate = random.randint(55, 115)
        spo2       = random.uniform(93, 100)

        doc = {
            "_index":    "donnees-cardiaques",
            "_id":       f"WATCH-{i:03d}-{int(time.time())}",
            "_source": {
                "device_id":   f"WATCH-{i:03d}",
                "patient_id":  f"PATIENT-{i}",
                "timestamp":   int(time.time()),
                "heart_rate":  heart_rate,
                "spo2":        round(spo2, 1),
                "temperature": round(random.uniform(36.0, 37.8), 1),
                "steps":       random.randint(0, 20000),
                "severite":    "CRITIQUE" if heart_rate > 140 else "ATTENTION" if heart_rate > 110 else "NORMAL",
                "region":      "ca-central-1"
            }
        }
        actions.append(doc)
    return actions

# Indexation bulk
success, errors = bulk(client, generer_donnees(100))
print(f"Indexé : {success} documents, {len(errors)} erreurs")

5. Recherches et Agrégations

Recherche Simple (Match Query)

output
# Trouver tous les enregistrements CRITIQUES
query_critiques = {
    "query": {
        "term": {"severite": "CRITIQUE"}
    },
    "sort": [{"timestamp": {"order": "desc"}}],
    "size": 20
}

response = client.search(index='donnees-cardiaques', body=query_critiques)
print(f"Enregistrements critiques : {response['hits']['total']['value']}")
for hit in response['hits']['hits']:
    print(f"  Device: {hit['_source']['device_id']}, HR: {hit['_source']['heart_rate']}")

Recherche avec Filtre sur Plage (Range Query)

output
# Trouver les anomalies cardiaques des 30 dernières minutes
now = int(time.time())
thirty_min_ago = now - 1800

query_anomalies = {
    "query": {
        "bool": {
            "must": [
                {
                    "range": {
                        "timestamp": {
                            "gte": thirty_min_ago,
                            "lte": now
                        }
                    }
                }
            ],
            "should": [
                {"range": {"heart_rate": {"gt": 120}}},    # Tachycardie
                {"range": {"heart_rate": {"lt": 45}}},     # Bradycardie
                {"range": {"spo2": {"lt": 90}}}            # Hypoxémie
            ],
            "minimum_should_match": 1
        }
    },
    "sort": [{"timestamp": {"order": "desc"}}]
}

response = client.search(index='donnees-cardiaques', body=query_anomalies)
print(f"Anomalies détectées : {response['hits']['total']['value']}")

Agrégations — Statistiques en Temps Réel

output
# Agrégation : statistiques cardiaques par device (dernières 24h)
query_stats = {
    "query": {
        "range": {
            "timestamp": {"gte": "now-24h/h", "lte": "now"}
        }
    },
    "aggs": {
        "par_device": {
            "terms": {"field": "device_id", "size": 50},
            "aggs": {
                "fc_moyenne":  {"avg": {"field": "heart_rate"}},
                "fc_max":      {"max": {"field": "heart_rate"}},
                "fc_min":      {"min": {"field": "heart_rate"}},
                "spo2_moyenne":{"avg": {"field": "spo2"}},
                "nb_critiques":{
                    "filter": {"term": {"severite": "CRITIQUE"}}
                }
            }
        },
        "par_severite": {
            "terms": {"field": "severite"},
            "aggs": {
                "nb_patients": {"cardinality": {"field": "patient_id"}}
            }
        },
        "tendance_horaire": {
            "date_histogram": {
                "field":            "timestamp",
                "fixed_interval":   "1h",
                "format":           "yyyy-MM-dd HH:mm"
            },
            "aggs": {
                "fc_moyenne": {"avg": {"field": "heart_rate"}}
            }
        }
    },
    "size": 0   # Ne retourner que les agrégations, pas les documents
}

response = client.search(index='donnees-cardiaques', body=query_stats)
buckets = response['aggregations']['par_severite']['buckets']
for b in buckets:
    print(f"Sévérité {b['key']}: {b['doc_count']} lectures, {b['nb_patients']['value']} patients")

Chapitre 02 – Leçon 2 : Amazon Cognito — Authentification pour OpenSearch Dashboards

NOTEObjectif — Configurer Amazon Cognito pour sécuriser l'accès aux OpenSearch Dashboards, créer des rôles distincts pour différents types d'utilisateurs (administrateurs, data scientists, opérateurs de monitoring), et appliquer un contrôle d'accès précis aux index.

1. Pourquoi Cognito pour OpenSearch ?

NOTEProblème — Par défaut, OpenSearch Dashboards utilise une authentification HTTP basique (nom d'utilisateur/mot de passe). Ce n'est pas adapté pour une organisation avec des dizaines d'utilisateurs, des profils différents et des besoins d'authentification SSO.

Sans Cognito

Avec Cognito

2. Architecture Cognito + OpenSearch

output
# Architecture d'authentification

# UTILISATEUR
#     ↓  (1) Accède à OpenSearch Dashboards
# AMAZON COGNITO USER POOL
#     ↓  (2) Vérifie les credentials (ou SSO via IdP)
#     ↓  (3) Émet un token JWT
# AMAZON COGNITO IDENTITY POOL
#     ↓  (4) Échange le JWT contre des credentials AWS temporaires (STS)
#     ↓  (5) Assigne un rôle IAM selon le groupe Cognito
# AMAZON OPENSEARCH SERVICE
#     ↓  (6) Vérifie les permissions Fine-Grained Access Control (FGAC)
# DONNÉES DE L'INDEX

3. Créer le User Pool Cognito

bash
# Étape 1 : Créer le User Pool
aws cognito-idp create-user-pool \
    --pool-name opensearch-sante-users \
    --policies '{
        "PasswordPolicy": {
            "MinimumLength": 12,
            "RequireUppercase": true,
            "RequireLowercase": true,
            "RequireNumbers": true,
            "RequireSymbols": true
        }
    }' \
    --auto-verified-attributes email \
    --mfa-configuration OPTIONAL \
    --sms-configuration '{"SnsCallerArn": "arn:aws:iam::123456789012:role/CognitoSNSRole", "ExternalId": "extId"}' \
    --query "UserPool.Id" --output text
# → us-east-1_XXXXXXXXX (notez cet ID)

# Étape 2 : Créer le App Client (sans secret pour Dashboards)
aws cognito-idp create-user-pool-client \
    --user-pool-id us-east-1_XXXXXXXXX \
    --client-name opensearch-dashboards-client \
    --no-generate-secret \
    --explicit-auth-flows ALLOW_USER_PASSWORD_AUTH ALLOW_REFRESH_TOKEN_AUTH \
    --supported-identity-providers COGNITO \
    --query "UserPoolClient.ClientId" --output text
# → YYYYYYYYYYYYYYYYYY (notez cet ID)

Créer les Groupes d'Utilisateurs

bash
# Groupe 1 : Administrateurs OpenSearch (accès total)
aws cognito-idp create-group \
    --user-pool-id us-east-1_XXXXXXXXX \
    --group-name opensearch-admins \
    --description "Accès administrateur complet à OpenSearch" \
    --role-arn arn:aws:iam::123456789012:role/OpenSearch-AdminRole

# Groupe 2 : Data Scientists (lecture seule sur tous les index)
aws cognito-idp create-group \
    --user-pool-id us-east-1_XXXXXXXXX \
    --group-name opensearch-scientists \
    --description "Lecture seule pour analyse" \
    --role-arn arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole

# Groupe 3 : Opérateurs de monitoring (lecture sur l'index monitoring seulement)
aws cognito-idp create-group \
    --user-pool-id us-east-1_XXXXXXXXX \
    --group-name opensearch-operators \
    --description "Monitoring opérationnel uniquement" \
    --role-arn arn:aws:iam::123456789012:role/OpenSearch-OperatorRole

# Créer un utilisateur et l'ajouter à un groupe
aws cognito-idp admin-create-user \
    --user-pool-id us-east-1_XXXXXXXXX \
    --username dr.dupont@hopital.ca \
    --user-attributes Name=email,Value=dr.dupont@hopital.ca Name=email_verified,Value=true \
    --temporary-password TempPassword123!

aws cognito-idp admin-add-user-to-group \
    --user-pool-id us-east-1_XXXXXXXXX \
    --username dr.dupont@hopital.ca \
    --group-name opensearch-scientists

4. Créer l'Identity Pool

bash
# Créer l'Identity Pool (lie le User Pool aux rôles IAM)
aws cognito-identity create-identity-pool \
    --identity-pool-name opensearch-sante-identity \
    --allow-unauthenticated-identities false \
    --cognito-identity-providers '[
        {
            "ProviderName": "cognito-idp.ca-central-1.amazonaws.com/ca-central-1_XXXXXXXXX",
            "ClientId": "YYYYYYYYYYYYYYYYYY",
            "ServerSideTokenCheck": true
        }
    ]' \
    --query "IdentityPoolId" --output text
# → ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ

# Associer les rôles IAM à l'Identity Pool
aws cognito-identity set-identity-pool-roles \
    --identity-pool-id "ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ" \
    --roles authenticated=arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole \
    --role-mappings '{
        "cognito-idp.ca-central-1.amazonaws.com/ca-central-1_XXXXXXXXX:YYYYYYYYYYYYYYYYYY": {
            "Type": "Rules",
            "AmbiguousRoleResolution": "AuthenticatedRole",
            "RulesConfiguration": {
                "Rules": [
                    {
                        "Claim": "cognito:groups",
                        "MatchType": "Contains",
                        "Value": "opensearch-admins",
                        "RoleARN": "arn:aws:iam::123456789012:role/OpenSearch-AdminRole"
                    },
                    {
                        "Claim": "cognito:groups",
                        "MatchType": "Contains",
                        "Value": "opensearch-operators",
                        "RoleARN": "arn:aws:iam::123456789012:role/OpenSearch-OperatorRole"
                    }
                ]
            }
        }
    }'

5. Fine-Grained Access Control dans OpenSearch

NOTEFine-Grained Access Control (FGAC) — OpenSearch permet de contrôler les accès au niveau de l'index, du type de document, et même des champs individuels. Cela permet de créer des rôles personnalisés qui n'exposent que les données nécessaires.
output
# Configurer les rôles OpenSearch via l'API REST

import requests, json

host  = 'https://votre-domaine.ca-central-1.es.amazonaws.com'
auth  = ('admin', 'VotreMotDePasseSecurise#123')

# Rôle 1 : Lecture seule sur l'index données-cardiaques
roles_payload = {
    "cluster_permissions": ["cluster:monitor/main"],
    "index_permissions": [
        {
            "index_patterns": ["donnees-cardiaques*"],
            "allowed_actions": [
                "read",
                "indices:data/read/search",
                "indices:data/read/msearch",
                "indices:admin/mappings/get"
            ]
        }
    ],
    "tenant_permissions": [
        {
            "tenant_patterns": ["global_tenant"],
            "allowed_actions": ["kibana_all_read"]
        }
    ]
}

# Créer le rôle "data-scientist-reader"
requests.put(
    f"{host}/_plugins/_security/api/roles/data-scientist-reader",
    json=roles_payload, auth=auth
)

# Mapper le rôle IAM Cognito vers le rôle OpenSearch
role_mapping = {
    "backend_roles": [
        "arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole"
    ],
    "description": "Data Scientists ont accès en lecture aux données cardiaques"
}

requests.put(
    f"{host}/_plugins/_security/api/rolesmapping/data-scientist-reader",
    json=role_mapping, auth=auth
)

print("Rôle et mapping créés avec succès")

6. Activer Cognito dans OpenSearch

bash
# Activer l'authentification Cognito sur le domaine OpenSearch
aws opensearch update-domain-config \
    --domain-name sante-iot-dashboard \
    --cognito-options '{
        "Enabled": true,
        "UserPoolId": "ca-central-1_XXXXXXXXX",
        "IdentityPoolId": "ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ",
        "RoleArn": "arn:aws:iam::123456789012:role/CognitoAccessForAmazonOpenSearch"
    }'

# Attendre que la mise à jour soit terminée
aws opensearch describe-domain \
    --domain-name sante-iot-dashboard \
    --query "DomainStatus.Processing"
# Attendre que la valeur soit "false"

# Récupérer l'URL des Dashboards
aws opensearch describe-domain \
    --domain-name sante-iot-dashboard \
    --query "DomainStatus.Endpoints"
va-plus-loin

Cet article couvre les extraits les plus utiles — le cours complet AWS Données Temps Réel (6 chapitres, 14 leçons, exercices corrigés et projet final) t'emmène jusqu'au bout.

./acceder-au-cours-complet cours gratuit : Maîtriser Claude Code

FAQ

Combien de temps pour apprendre AWS Données Temps Réel ?
Avec une progression structurée (6 chapitres, 14 leçons courtes et pratiques), on atteint un niveau opérationnel en quelques semaines à raison de 30 à 60 minutes par jour. L'important est de pratiquer chaque notion immédiatement.
Faut-il des prérequis ?
Des bases en informatique suffisent. Si tu sais utiliser un terminal et lire du code simple, tu es prêt.
Par où commencer concrètement ?
Reproduis les commandes de cet article, puis suis le cours complet AWS Données Temps Réel : il enchaîne les 14 leçons dans l'ordre, avec exercices et projet final.

📬 Tu veux recevoir ce type de guide chaque semaine ? Abonne-toi gratuitement — code réel, zéro blabla.