Lance-toi en AWS Données Temps Réel : ton premier pas concret aujourd'hui
AWS Données Temps Réel : l'essentiel en un article — vrai code, schémas et étapes concrètes, extraits d'un cours de 14 leçons.
La meilleure façon d'apprendre AWS Données Temps Réel, c'est de faire. Cet article te met le pied à l'étrier avec des extraits pratiques tirés d'un cours de 14 leçons — de quoi obtenir un premier résultat dès aujourd'hui.
tl;dr
- Introduction au Streaming de Donnees AWS
- Amazon Kinesis
- Amazon OpenSearch
- Securite et Encryption
- Kafka vs Kinesis et Alternatives
~$ cat ./parcours.md # AWS Données Temps Réel — 6 chapitres
01
Introduction au Streaming de Données AWS
→ Chapitre 00 – Leçon 1 : Données en Temps Réel vs Traitement par Lots (Batch)→ Chapitre 00 – Leçon 2 : L'Écosystème AWS pour le Streaming de Données
02
Amazon Kinesis
→ Chapitre 01 – Leçon 1 : Amazon Kinesis Data Streams — Shards et Consommateurs→ Chapitre 01 – Leçon 2 : Kinesis Firehose — Livraison Managée et Transformation+ 1 autres leçons
03
Amazon OpenSearch
→ Chapitre 02 – Leçon 1 : Amazon OpenSearch — Indexation et Recherche en Temps Réel→ Chapitre 02 – Leçon 2 : Amazon Cognito — Authentification pour OpenSearch Dashboards+ 1 autres leçons
04
Sécurité et Encryption
→ Chapitre 03 – Leçon 1 : AWS KMS — Chiffrement des Données dans Kinesis et Firehose→ Chapitre 03 – Leçon 2 : IAM, VPC et Bonnes Pratiques de Sécurité pour le Streaming
05
Kafka vs Kinesis et Alternatives
→ Chapitre 04 – Leçon 1 : Apache Kafka vs Amazon Kinesis — Comparaison Complète→ Chapitre 04 – Leçon 2 : Migration Kafka vers Kinesis / MSK et Patterns Hybrides
06
Architectures de Référence
→ Chapitre 05 – Leçon 1 : Architecture de Référence — IoT Wearables Médicaux→ Chapitre 05 – Leçon 2 : Architecture de Référence — Détection de Fraude E-Commerce en Temps Réel
🏁
Projet final
→ Tu repars avec un projet concret et démontrable
Chapitre 01 – Leçon 1 : Amazon Kinesis Data Streams — Shards et Consommateurs
NOTEObjectif — Comprendre l'architecture interne de Kinesis Data Streams : comment les shards permettent le parallélisme, comment les producteurs publient des données, et comment les consommateurs les traitent — avec et sans Enhanced Fan-Out.
1. Architecture des Shards — L'Unité de Base
NOTEShard — Un shard est une unité de capacité isolée dans un flux Kinesis. Chaque shard peut ingérer jusqu'à 1 MB/s ou 1 000 enregistrements/s en entrée et fournir jusqu'à 2 MB/s en sortie (par consommateur).
# Capacité par shard
# ─────────────────────────────────────
# Écriture (PUT) : 1 MB/s OU 1 000 records/s (la limite la plus basse s'applique)
# Lecture (GET) : 2 MB/s par GetRecords() - partagée entre TOUS les consommateurs
#
# → Pour 5 Mo/s d'ingestion : vous avez besoin de 5 shards
# → Pour 10 000 records/s : vous avez besoin de 10 shards
# Calculer le nombre de shards nécessaires
max_ingestion_rate = 5 # MB/s
max_record_rate = 3000 # records/s
shards_pour_volume = max_ingestion_rate / 1 # = 5 shards
shards_pour_taux = max_record_rate / 1000 # = 3 shards
nb_shards = max(shards_pour_volume, shards_pour_taux)
print(f"Shards nécessaires : {int(nb_shards)}") # → 5 shardsPartition Key — Comment les Données sont Distribuées
NOTEPartition Key — Chaque enregistrement possède une clé de partition. Kinesis applique une fonction de hachage MD5 sur cette clé pour déterminer dans quel shard l'enregistrement est routé. Les enregistrements avec la même clé de partition vont toujours dans le même shard (ordre garanti).
# Exemples de Partition Keys et leur impact
# ✅ BON : clé à haute cardinalité → distribution uniforme entre shards
# device_id = "WATCH-001", "WATCH-002", ..., "WATCH-10000"
kinesis.put_record(
StreamName='donnees-sante',
Data=json.dumps(payload),
PartitionKey=payload['device_id'] # Des milliers de valeurs uniques
)
# ⚠️ PROBLÈME : clé à faible cardinalité → "hot shard" (un shard surchargé)
# Si tous les enregistrements utilisent la même clé :
kinesis.put_record(
StreamName='donnees-sante',
Data=json.dumps(payload),
PartitionKey='donnees-sante' # ← Toujours le même shard ! Goulot d'étranglement
)
# ✅ SOLUTION si pas de clé naturelle : clé aléatoire
import uuid
kinesis.put_record(
StreamName='donnees-sante',
Data=json.dumps(payload),
PartitionKey=str(uuid.uuid4()) # Aléatoire = distribution parfaite
# Attention : perte de l'ordre dans le shard !
)2. Producteurs — Envoyer des Données
put_record vs put_records
import boto3, json, time
kinesis = boto3.client('kinesis', region_name='ca-central-1')
# Méthode 1 : put_record — un seul enregistrement
# Latence : ~70ms par appel
donnee = {
"device_id": "WATCH-001",
"heart_rate": 72,
"timestamp": int(time.time())
}
response = kinesis.put_record(
StreamName='donnees-sante',
Data=json.dumps(donnee).encode('utf-8'),
PartitionKey=donnee['device_id']
)
print(f"ShardId: {response['ShardId']}")
# ─────────────────────────────────────
# Méthode 2 : put_records — plusieurs enregistrements en batch (RECOMMANDÉ)
# Latence : ~70ms pour jusqu'à 500 enregistrements !
# Limite : max 500 enregistrements ou 5MB par appel
donnees = [
{"device_id": f"WATCH-{i:03d}", "heart_rate": 60 + i, "timestamp": int(time.time())}
for i in range(10)
]
records = [
{
'Data': json.dumps(d).encode('utf-8'),
'PartitionKey': d['device_id']
}
for d in donnees
]
response = kinesis.put_records(
StreamName='donnees-sante',
Records=records
)
print(f"Enregistrements OK: {response['Records'].__len__()} envoyés")
print(f"Échecs: {response['FailedRecordCount']}")
# Gérer les échecs partiels (put_records peut avoir des succès partiels)
failed_records = [
records[i] for i, r in enumerate(response['Records'])
if 'ErrorCode' in r
]
if failed_records:
print(f"Retry nécessaire pour {len(failed_records)} enregistrements")3. Consommateurs — Lire les Données
Mode Standard (GetRecords)
NOTEGetRecords Standard — Tous les consommateurs partagent la limite de 2 MB/s par shard. Si vous avez 3 consommateurs sur 1 shard, chacun n'obtient que ~667 KB/s.
# Consommateur standard avec GetRecords
import boto3, json, time
kinesis = boto3.client('kinesis', region_name='ca-central-1')
STREAM_NAME = 'donnees-sante'
# 1. Lister les shards
response = kinesis.describe_stream(StreamName=STREAM_NAME)
shards = response['StreamDescription']['Shards']
for shard in shards:
shard_id = shard['ShardId']
# 2. Obtenir un itérateur de shard (LATEST = nouvelles données seulement)
iterator_response = kinesis.get_shard_iterator(
StreamName=STREAM_NAME,
ShardId=shard_id,
ShardIteratorType='LATEST' # Ou 'TRIM_HORIZON' pour lire depuis le début
)
shard_iterator = iterator_response['ShardIterator']
# 3. Boucle de lecture
while True:
records_response = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100 # Max 100 enregistrements par appel (ou 10MB)
)
for record in records_response['Records']:
data = json.loads(record['Data'].decode('utf-8'))
print(f"Device: {data['device_id']}, HR: {data['heart_rate']}")
shard_iterator = records_response['NextShardIterator']
# Limitation : max 5 appels GetRecords/s par shard
if not records_response['Records']:
time.sleep(1) # Attendre si pas de nouvelles donnéesEnhanced Fan-Out — Consommateurs Dédiés
TIPEnhanced Fan-Out — Chaque consommateur enregistré obtient sa propre capacité de 2 MB/s par shard (au lieu de partager). Utilise HTTP/2 push au lieu de polling. Idéal pour les applications critiques nécessitant une faible latence (<70ms).
# Enregistrer un consommateur Enhanced Fan-Out
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante \
--consumer-name alertes-medicales-temps-reel
# Lister les consommateurs enregistrés
aws kinesis list-stream-consumers \
--stream-arn arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante# Consommateur avec Enhanced Fan-Out (SubscribeToShard)
import boto3, json
kinesis = boto3.client('kinesis', region_name='ca-central-1')
STREAM_ARN = 'arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante'
CONSUMER_ARN = 'arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante/consumer/alertes-medicales:1234567890'
SHARD_ID = 'shardId-000000000000'
# SubscribeToShard avec HTTP/2 streaming (push au lieu de polling)
response = kinesis.subscribe_to_shard(
ConsumerARN=CONSUMER_ARN,
ShardId=SHARD_ID,
StartingPosition={'Type': 'LATEST'}
)
# Traiter le flux d'événements en temps réel
event_stream = response['EventStream']
for event in event_stream:
if 'SubscribeToShardEvent' in event:
for record in event['SubscribeToShardEvent']['Records']:
data = json.loads(record['Data'].decode('utf-8'))
print(f"[Enhanced Fan-Out] Device: {data['device_id']}, HR: {data['heart_rate']}")Chapitre 02 – Leçon 1 : Amazon OpenSearch — Indexation et Recherche en Temps Réel
NOTEObjectif — Comprendre le fonctionnement d'Amazon OpenSearch Service : comment créer des index, définir des mappings, et effectuer des recherches et agrégations en temps réel sur des données de streaming.
1. OpenSearch — Concepts Fondamentaux
NOTEOpenSearch — Fork open-source d'Elasticsearch (développé par AWS depuis 2021). Moteur de recherche et d'analytics distribué basé sur Apache Lucene. Amazon OpenSearch Service est la version managée sur AWS.
Terminologie
| Index | ≈ Table d'une base de données |
| Document | ≈ Ligne dans une table (JSON) |
| Field | ≈ Colonne dans une table |
| Shard | Fragment d'un index (distribution) |
| Replica | Copie d'un shard (haute disponibilité) |
| Mapping | Schéma des types de champs |
Quand Utiliser OpenSearch
2. Créer un Domaine OpenSearch
# Créer un domaine OpenSearch pour données IoT
aws opensearch create-domain \
--domain-name sante-iot-dashboard \
--engine-version 'OpenSearch_2.11' \
--cluster-config '{
"InstanceType": "t3.small.search",
"InstanceCount": 2,
"DedicatedMasterEnabled": false,
"ZoneAwarenessEnabled": true,
"ZoneAwarenessConfig": {"AvailabilityZoneCount": 2}
}' \
--ebs-options '{
"EBSEnabled": true,
"VolumeType": "gp3",
"VolumeSize": 20,
"Iops": 3000
}' \
--encryption-at-rest-options '{"Enabled": true}' \
--node-to-node-encryption-options '{"Enabled": true}' \
--domain-endpoint-options '{"EnforceHTTPS": true, "TLSSecurityPolicy": "Policy-Min-TLS-1-2-2019-07"}' \
--advanced-security-options '{
"Enabled": true,
"InternalUserDatabaseEnabled": true,
"MasterUserOptions": {
"MasterUserName": "admin",
"MasterUserPassword": "VotreMotDePasseSecurise#123"
}
}'
# Vérifier le statut du domaine
aws opensearch describe-domain \
--domain-name sante-iot-dashboard \
--query "DomainStatus.Processing"3. Créer un Index avec Mapping
NOTEMapping — Le mapping définit les types de données de chaque champ. OpenSearch peut auto-détecter les types (dynamic mapping), mais il est recommandé de définir explicitement le mapping pour les champs critiques afin d'éviter les erreurs de typage.
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
# Connexion au domaine OpenSearch
region = 'ca-central-1'
service = 'es'
host = 'https://votre-domaine.ca-central-1.es.amazonaws.com'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region, service,
session_token=credentials.token
)
client = OpenSearch(
hosts=[{'host': host.replace('https://', ''), 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
# Créer l'index avec un mapping explicite
mapping = {
"settings": {
"number_of_shards": 2, # Distribuer sur 2 nœuds
"number_of_replicas": 1 # 1 copie pour la HA
},
"mappings": {
"properties": {
"device_id": {"type": "keyword"}, # Valeur exacte (non analysée)
"patient_id": {"type": "keyword"},
"timestamp": {"type": "date", "format": "epoch_second"},
"heart_rate": {"type": "integer"},
"spo2": {"type": "float"},
"temperature": {"type": "float"},
"steps": {"type": "integer"},
"severite": {"type": "keyword"},
"region": {"type": "keyword"},
"processed_at":{"type": "date"},
# Champ texte analysé pour la recherche full-text
"notes": {"type": "text", "analyzer": "french"}
}
}
}
response = client.indices.create(index='donnees-cardiaques', body=mapping)
print(f"Index créé : {response['acknowledged']}")
# Vérifier le mapping
mapping_info = client.indices.get_mapping(index='donnees-cardiaques')
print(f"Mapping : {list(mapping_info['donnees-cardiaques']['mappings']['properties'].keys())}")4. Indexer des Documents
import time, json, random
# Indexer un document individuel
document = {
"device_id": "WATCH-042",
"patient_id": "PATIENT-42",
"timestamp": int(time.time()),
"heart_rate": 88,
"spo2": 97.5,
"temperature": 37.1,
"steps": 12453,
"severite": "NORMAL",
"region": "ca-central-1"
}
response = client.index(
index='donnees-cardiaques',
body=document,
id=f"{document['device_id']}-{document['timestamp']}", # ID unique
refresh=True # Rend le document immédiatement recherchable
)
print(f"Document indexé : {response['result']}")
# Indexation en bulk (plus efficace pour de grands volumes)
from opensearchpy.helpers import bulk
def generer_donnees(n_montres=100):
"""Générer n enregistrements de montres cardiaques"""
actions = []
for i in range(n_montres):
heart_rate = random.randint(55, 115)
spo2 = random.uniform(93, 100)
doc = {
"_index": "donnees-cardiaques",
"_id": f"WATCH-{i:03d}-{int(time.time())}",
"_source": {
"device_id": f"WATCH-{i:03d}",
"patient_id": f"PATIENT-{i}",
"timestamp": int(time.time()),
"heart_rate": heart_rate,
"spo2": round(spo2, 1),
"temperature": round(random.uniform(36.0, 37.8), 1),
"steps": random.randint(0, 20000),
"severite": "CRITIQUE" if heart_rate > 140 else "ATTENTION" if heart_rate > 110 else "NORMAL",
"region": "ca-central-1"
}
}
actions.append(doc)
return actions
# Indexation bulk
success, errors = bulk(client, generer_donnees(100))
print(f"Indexé : {success} documents, {len(errors)} erreurs")5. Recherches et Agrégations
Recherche Simple (Match Query)
# Trouver tous les enregistrements CRITIQUES
query_critiques = {
"query": {
"term": {"severite": "CRITIQUE"}
},
"sort": [{"timestamp": {"order": "desc"}}],
"size": 20
}
response = client.search(index='donnees-cardiaques', body=query_critiques)
print(f"Enregistrements critiques : {response['hits']['total']['value']}")
for hit in response['hits']['hits']:
print(f" Device: {hit['_source']['device_id']}, HR: {hit['_source']['heart_rate']}")Recherche avec Filtre sur Plage (Range Query)
# Trouver les anomalies cardiaques des 30 dernières minutes
now = int(time.time())
thirty_min_ago = now - 1800
query_anomalies = {
"query": {
"bool": {
"must": [
{
"range": {
"timestamp": {
"gte": thirty_min_ago,
"lte": now
}
}
}
],
"should": [
{"range": {"heart_rate": {"gt": 120}}}, # Tachycardie
{"range": {"heart_rate": {"lt": 45}}}, # Bradycardie
{"range": {"spo2": {"lt": 90}}} # Hypoxémie
],
"minimum_should_match": 1
}
},
"sort": [{"timestamp": {"order": "desc"}}]
}
response = client.search(index='donnees-cardiaques', body=query_anomalies)
print(f"Anomalies détectées : {response['hits']['total']['value']}")Agrégations — Statistiques en Temps Réel
# Agrégation : statistiques cardiaques par device (dernières 24h)
query_stats = {
"query": {
"range": {
"timestamp": {"gte": "now-24h/h", "lte": "now"}
}
},
"aggs": {
"par_device": {
"terms": {"field": "device_id", "size": 50},
"aggs": {
"fc_moyenne": {"avg": {"field": "heart_rate"}},
"fc_max": {"max": {"field": "heart_rate"}},
"fc_min": {"min": {"field": "heart_rate"}},
"spo2_moyenne":{"avg": {"field": "spo2"}},
"nb_critiques":{
"filter": {"term": {"severite": "CRITIQUE"}}
}
}
},
"par_severite": {
"terms": {"field": "severite"},
"aggs": {
"nb_patients": {"cardinality": {"field": "patient_id"}}
}
},
"tendance_horaire": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "1h",
"format": "yyyy-MM-dd HH:mm"
},
"aggs": {
"fc_moyenne": {"avg": {"field": "heart_rate"}}
}
}
},
"size": 0 # Ne retourner que les agrégations, pas les documents
}
response = client.search(index='donnees-cardiaques', body=query_stats)
buckets = response['aggregations']['par_severite']['buckets']
for b in buckets:
print(f"Sévérité {b['key']}: {b['doc_count']} lectures, {b['nb_patients']['value']} patients")Chapitre 02 – Leçon 2 : Amazon Cognito — Authentification pour OpenSearch Dashboards
NOTEObjectif — Configurer Amazon Cognito pour sécuriser l'accès aux OpenSearch Dashboards, créer des rôles distincts pour différents types d'utilisateurs (administrateurs, data scientists, opérateurs de monitoring), et appliquer un contrôle d'accès précis aux index.
1. Pourquoi Cognito pour OpenSearch ?
NOTEProblème — Par défaut, OpenSearch Dashboards utilise une authentification HTTP basique (nom d'utilisateur/mot de passe). Ce n'est pas adapté pour une organisation avec des dizaines d'utilisateurs, des profils différents et des besoins d'authentification SSO.
Sans Cognito
Avec Cognito
2. Architecture Cognito + OpenSearch
# Architecture d'authentification # UTILISATEUR # ↓ (1) Accède à OpenSearch Dashboards # AMAZON COGNITO USER POOL # ↓ (2) Vérifie les credentials (ou SSO via IdP) # ↓ (3) Émet un token JWT # AMAZON COGNITO IDENTITY POOL # ↓ (4) Échange le JWT contre des credentials AWS temporaires (STS) # ↓ (5) Assigne un rôle IAM selon le groupe Cognito # AMAZON OPENSEARCH SERVICE # ↓ (6) Vérifie les permissions Fine-Grained Access Control (FGAC) # DONNÉES DE L'INDEX
3. Créer le User Pool Cognito
# Étape 1 : Créer le User Pool
aws cognito-idp create-user-pool \
--pool-name opensearch-sante-users \
--policies '{
"PasswordPolicy": {
"MinimumLength": 12,
"RequireUppercase": true,
"RequireLowercase": true,
"RequireNumbers": true,
"RequireSymbols": true
}
}' \
--auto-verified-attributes email \
--mfa-configuration OPTIONAL \
--sms-configuration '{"SnsCallerArn": "arn:aws:iam::123456789012:role/CognitoSNSRole", "ExternalId": "extId"}' \
--query "UserPool.Id" --output text
# → us-east-1_XXXXXXXXX (notez cet ID)
# Étape 2 : Créer le App Client (sans secret pour Dashboards)
aws cognito-idp create-user-pool-client \
--user-pool-id us-east-1_XXXXXXXXX \
--client-name opensearch-dashboards-client \
--no-generate-secret \
--explicit-auth-flows ALLOW_USER_PASSWORD_AUTH ALLOW_REFRESH_TOKEN_AUTH \
--supported-identity-providers COGNITO \
--query "UserPoolClient.ClientId" --output text
# → YYYYYYYYYYYYYYYYYY (notez cet ID)Créer les Groupes d'Utilisateurs
# Groupe 1 : Administrateurs OpenSearch (accès total)
aws cognito-idp create-group \
--user-pool-id us-east-1_XXXXXXXXX \
--group-name opensearch-admins \
--description "Accès administrateur complet à OpenSearch" \
--role-arn arn:aws:iam::123456789012:role/OpenSearch-AdminRole
# Groupe 2 : Data Scientists (lecture seule sur tous les index)
aws cognito-idp create-group \
--user-pool-id us-east-1_XXXXXXXXX \
--group-name opensearch-scientists \
--description "Lecture seule pour analyse" \
--role-arn arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole
# Groupe 3 : Opérateurs de monitoring (lecture sur l'index monitoring seulement)
aws cognito-idp create-group \
--user-pool-id us-east-1_XXXXXXXXX \
--group-name opensearch-operators \
--description "Monitoring opérationnel uniquement" \
--role-arn arn:aws:iam::123456789012:role/OpenSearch-OperatorRole
# Créer un utilisateur et l'ajouter à un groupe
aws cognito-idp admin-create-user \
--user-pool-id us-east-1_XXXXXXXXX \
--username dr.dupont@hopital.ca \
--user-attributes Name=email,Value=dr.dupont@hopital.ca Name=email_verified,Value=true \
--temporary-password TempPassword123!
aws cognito-idp admin-add-user-to-group \
--user-pool-id us-east-1_XXXXXXXXX \
--username dr.dupont@hopital.ca \
--group-name opensearch-scientists4. Créer l'Identity Pool
# Créer l'Identity Pool (lie le User Pool aux rôles IAM)
aws cognito-identity create-identity-pool \
--identity-pool-name opensearch-sante-identity \
--allow-unauthenticated-identities false \
--cognito-identity-providers '[
{
"ProviderName": "cognito-idp.ca-central-1.amazonaws.com/ca-central-1_XXXXXXXXX",
"ClientId": "YYYYYYYYYYYYYYYYYY",
"ServerSideTokenCheck": true
}
]' \
--query "IdentityPoolId" --output text
# → ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ
# Associer les rôles IAM à l'Identity Pool
aws cognito-identity set-identity-pool-roles \
--identity-pool-id "ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ" \
--roles authenticated=arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole \
--role-mappings '{
"cognito-idp.ca-central-1.amazonaws.com/ca-central-1_XXXXXXXXX:YYYYYYYYYYYYYYYYYY": {
"Type": "Rules",
"AmbiguousRoleResolution": "AuthenticatedRole",
"RulesConfiguration": {
"Rules": [
{
"Claim": "cognito:groups",
"MatchType": "Contains",
"Value": "opensearch-admins",
"RoleARN": "arn:aws:iam::123456789012:role/OpenSearch-AdminRole"
},
{
"Claim": "cognito:groups",
"MatchType": "Contains",
"Value": "opensearch-operators",
"RoleARN": "arn:aws:iam::123456789012:role/OpenSearch-OperatorRole"
}
]
}
}
}'5. Fine-Grained Access Control dans OpenSearch
NOTEFine-Grained Access Control (FGAC) — OpenSearch permet de contrôler les accès au niveau de l'index, du type de document, et même des champs individuels. Cela permet de créer des rôles personnalisés qui n'exposent que les données nécessaires.
# Configurer les rôles OpenSearch via l'API REST
import requests, json
host = 'https://votre-domaine.ca-central-1.es.amazonaws.com'
auth = ('admin', 'VotreMotDePasseSecurise#123')
# Rôle 1 : Lecture seule sur l'index données-cardiaques
roles_payload = {
"cluster_permissions": ["cluster:monitor/main"],
"index_permissions": [
{
"index_patterns": ["donnees-cardiaques*"],
"allowed_actions": [
"read",
"indices:data/read/search",
"indices:data/read/msearch",
"indices:admin/mappings/get"
]
}
],
"tenant_permissions": [
{
"tenant_patterns": ["global_tenant"],
"allowed_actions": ["kibana_all_read"]
}
]
}
# Créer le rôle "data-scientist-reader"
requests.put(
f"{host}/_plugins/_security/api/roles/data-scientist-reader",
json=roles_payload, auth=auth
)
# Mapper le rôle IAM Cognito vers le rôle OpenSearch
role_mapping = {
"backend_roles": [
"arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole"
],
"description": "Data Scientists ont accès en lecture aux données cardiaques"
}
requests.put(
f"{host}/_plugins/_security/api/rolesmapping/data-scientist-reader",
json=role_mapping, auth=auth
)
print("Rôle et mapping créés avec succès")6. Activer Cognito dans OpenSearch
# Activer l'authentification Cognito sur le domaine OpenSearch
aws opensearch update-domain-config \
--domain-name sante-iot-dashboard \
--cognito-options '{
"Enabled": true,
"UserPoolId": "ca-central-1_XXXXXXXXX",
"IdentityPoolId": "ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ",
"RoleArn": "arn:aws:iam::123456789012:role/CognitoAccessForAmazonOpenSearch"
}'
# Attendre que la mise à jour soit terminée
aws opensearch describe-domain \
--domain-name sante-iot-dashboard \
--query "DomainStatus.Processing"
# Attendre que la valeur soit "false"
# Récupérer l'URL des Dashboards
aws opensearch describe-domain \
--domain-name sante-iot-dashboard \
--query "DomainStatus.Endpoints"va-plus-loin
Cet article couvre les extraits les plus utiles — le cours complet AWS Données Temps Réel (6 chapitres, 14 leçons, exercices corrigés et projet final) t'emmène jusqu'au bout.
./acceder-au-cours-complet cours gratuit : Maîtriser Claude CodeFAQ
Combien de temps pour apprendre AWS Données Temps Réel ?
Avec une progression structurée (6 chapitres, 14 leçons courtes et pratiques), on atteint un niveau opérationnel en quelques semaines à raison de 30 à 60 minutes par jour. L'important est de pratiquer chaque notion immédiatement.
Faut-il des prérequis ?
Des bases en informatique suffisent. Si tu sais utiliser un terminal et lire du code simple, tu es prêt.
Par où commencer concrètement ?
Reproduis les commandes de cet article, puis suis le cours complet AWS Données Temps Réel : il enchaîne les 14 leçons dans l'ordre, avec exercices et projet final.
./a-lire-aussi
→ AWS Data Engineering Bootcamp expliqué simplement (avec schémas et vrai code)→ Python Data Science : les 9 étapes clés pour passer de zéro à opérationnel→ Python NumPy en pratique : le code et les commandes qui comptent vraiment📬 Tu veux recevoir ce type de guide chaque semaine ? Abonne-toi gratuitement — code réel, zéro blabla.