Lánzate a AWS Datos en Tiempo Real: tu primer paso concreto hoy
AWS Datos en Tiempo Real: lo esencial en un artículo — código real, diagramas y pasos concretos, extractos de un curso de 14 lecciones.
La mejor forma de aprender AWS Datos en Tiempo Real es practicando. Este artículo te da un empujón con extractos prácticos extraídos de un curso de 14 lecciones — lo suficiente para obtener un primer resultado hoy mismo.
tl;dr
- Introducción al Streaming de Datos AWS
- Amazon Kinesis
- Amazon OpenSearch
- Seguridad y Cifrado
- Kafka vs Kinesis y Alternativas
~$ cat ./parcours.md # AWS Données Temps Réel — 6 capítulos
01
Introducción al Streaming de Datos AWS
→ Capítulo 00 – Lección 1 : Datos en Tiempo Real vs Procesamiento por Lotes (Batch)→ Capítulo 00 – Lección 2 : El Ecosistema AWS para el Streaming de Datos
02
Amazon Kinesis
→ Capítulo 01 – Lección 1 : Amazon Kinesis Data Streams — Shards y Consumidores→ Capítulo 01 – Lección 2 : Kinesis Firehose — Entrega Gestionada y Transformación+ 1 otras lecciones
03
Amazon OpenSearch
→ Capítulo 02 – Lección 1 : Amazon OpenSearch — Indexación y Búsqueda en Tiempo Real→ Capítulo 02 – Lección 2 : Amazon Cognito — Autenticación para OpenSearch Dashboards+ 1 otras lecciones
04
Seguridad y Encriptación
→ Capítulo 03 – Lección 1 : AWS KMS — Cifrado de Datos en Kinesis y Firehose→ Capítulo 03 – Lección 2 : IAM, VPC y Buenas Prácticas de Seguridad para el Streaming
05
Kafka vs Kinesis y Alternativas
→ Capítulo 04 – Lección 1 : Apache Kafka vs Amazon Kinesis — Comparación Completa→ Capítulo 04 – Lección 2 : Migración de Kafka a Kinesis / MSK y Patrones Híbridos
06
Arquitecturas de Referencia
→ Capítulo 05 – Lección 1 : Arquitectura de Referencia — IoT Wearables Médicos→ Capítulo 05 – Lección 2 : Arquitectura de Referencia — Detección de Fraude E-Commerce en Tiempo Real
🏁
Proyecto final
→ Te vas con un proyecto concreto y demostrable
Capítulo 01 – Lección 1 : Amazon Kinesis Data Streams — Shards y Consumidores
NOTEObjetivo — Comprender la arquitectura interna de Kinesis Data Streams: cómo los shards permiten el paralelismo, cómo los productores publican datos y cómo los consumidores los procesan — con y sin Enhanced Fan-Out.
1. Arquitectura de los Shards — La Unidad Básica
NOTEShard — Un shard es una unidad de capacidad aislada en un flujo Kinesis. Cada shard puede ingerir hasta 1 MB/s o 1 000 registros/s en entrada y proporcionar hasta 2 MB/s en salida (por consumidor).
# Capacidad por shard
# ─────────────────────────────────────
# Escritura (PUT) : 1 MB/s O 1 000 records/s (se aplica el límite más bajo)
# Lectura (GET) : 2 MB/s por GetRecords() - compartida entre TODOS los consumidores
#
# → Para 5 MB/s de ingestión : necesitas 5 shards
# → Para 10 000 records/s : necesitas 10 shards
# Calcular el número de shards necesarios
max_ingestion_rate = 5 # MB/s
max_record_rate = 3000 # records/s
shards_pour_volume = max_ingestion_rate / 1 # = 5 shards
shards_pour_taux = max_record_rate / 1000 # = 3 shards
nb_shards = max(shards_pour_volume, shards_pour_taux)
print(f"Shards nécessaires : {int(nb_shards)}") # → 5 shardsPartition Key — Cómo se distribuyen los datos
NOTEPartition Key — Cada registro posee una clave de partición. Kinesis aplica una función de hash MD5 sobre esta clave para determinar en qué shard se enruta el registro. Los registros con la misma clave de partición siempre van al mismo shard (orden garantizado).
# Ejemplos de Partition Keys y su impacto
# ✅ BUENO : clave de alta cardinalidad → distribución uniforme entre shards
# device_id = "WATCH-001", "WATCH-002", ..., "WATCH-10000"
kinesis.put_record(
StreamName='donnees-sante',
Data=json.dumps(payload),
PartitionKey=payload['device_id'] # Miles de valores únicos
)
# ⚠️ PROBLEMA : clave de baja cardinalidad → "hot shard" (un shard sobrecargado)
# Si todos los registros usan la misma clave :
kinesis.put_record(
StreamName='donnees-sante',
Data=json.dumps(payload),
PartitionKey='donnees-sante' # ← ¡Siempre el mismo shard! Cuello de botella
)
# ✅ SOLUCIÓN si no hay clave natural : clave aleatoria
import uuid
kinesis.put_record(
StreamName='donnees-sante',
Data=json.dumps(payload),
PartitionKey=str(uuid.uuid4()) # Aleatorio = distribución perfecta
# ¡Atención : pérdida del orden en el shard!
)2. Productores — Enviar datos
put_record vs put_records
import boto3, json, time
kinesis = boto3.client('kinesis', region_name='ca-central-1')
# Método 1 : put_record — un solo registro
# Latencia : ~70ms por llamada
donnee = {
"device_id": "WATCH-001",
"heart_rate": 72,
"timestamp": int(time.time())
}
response = kinesis.put_record(
StreamName='donnees-sante',
Data=json.dumps(donnee).encode('utf-8'),
PartitionKey=donnee['device_id']
)
print(f"ShardId: {response['ShardId']}")
# ─────────────────────────────────────
# Método 2 : put_records — varios registros en batch (RECOMENDADO)
# Latencia : ~70ms para hasta 500 registros !
# Límite : máx 500 registros o 5MB por llamada
donnees = [
{"device_id": f"WATCH-{i:03d}", "heart_rate": 60 + i, "timestamp": int(time.time())}
for i in range(10)
]
records = [
{
'Data': json.dumps(d).encode('utf-8'),
'PartitionKey': d['device_id']
}
for d in donnees
]
response = kinesis.put_records(
StreamName='donnees-sante',
Records=records
)
print(f"Enregistrements OK: {response['Records'].__len__()} envoyés")
print(f"Échecs: {response['FailedRecordCount']}")
# Gestionar fallos parciales (put_records puede tener éxitos parciales)
failed_records = [
records[i] for i, r in enumerate(response['Records'])
if 'ErrorCode' in r
]
if failed_records:
print(f"Retry nécessaire pour {len(failed_records)} enregistrements")3. Consumidores — Leer los datos
Modo estándar (GetRecords)
NOTEGetRecords Standard — Todos los consumidores comparten el límite de 2 MB/s por shard. Si tienes 3 consumidores en 1 shard, cada uno obtiene solo ~667 KB/s.
# Consumidor estándar con GetRecords
import boto3, json, time
kinesis = boto3.client('kinesis', region_name='ca-central-1')
STREAM_NAME = 'donnees-sante'
# 1. Listar los shards
response = kinesis.describe_stream(StreamName=STREAM_NAME)
shards = response['StreamDescription']['Shards']
for shard in shards:
shard_id = shard['ShardId']
# 2. Obtener un iterador de shard (LATEST = solo datos nuevos)
iterator_response = kinesis.get_shard_iterator(
StreamName=STREAM_NAME,
ShardId=shard_id,
ShardIteratorType='LATEST' # O 'TRIM_HORIZON' para leer desde el principio
)
shard_iterator = iterator_response['ShardIterator']
# 3. Bucle de lectura
while True:
records_response = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100 # Máx 100 registros por llamada (o 10MB)
)
for record in records_response['Records']:
data = json.loads(record['Data'].decode('utf-8'))
print(f"Device: {data['device_id']}, HR: {data['heart_rate']}")
shard_iterator = records_response['NextShardIterator']
# Limitación : máx 5 llamadas GetRecords/s por shard
if not records_response['Records']:
time.sleep(1) # Esperar si no hay datos nuevosEnhanced Fan-Out — Consumidores Dedicados
TIPEnhanced Fan-Out — Cada consumidor registrado obtiene su propia capacidad de 2 MB/s por shard (en lugar de compartir). Usa HTTP/2 push en lugar de polling. Ideal para aplicaciones críticas que requieren baja latencia (<70ms).
# Registrar un consumidor Enhanced Fan-Out
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante \
--consumer-name alertes-medicales-temps-reel
# Listar los consumidores registrados
aws kinesis list-stream-consumers \
--stream-arn arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante# Consumidor con Enhanced Fan-Out (SubscribeToShard)
import boto3, json
kinesis = boto3.client('kinesis', region_name='ca-central-1')
STREAM_ARN = 'arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante'
CONSUMER_ARN = 'arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante/consumer/alertes-medicales:1234567890'
SHARD_ID = 'shardId-000000000000'
# SubscribeToShard con HTTP/2 streaming (push en lugar de polling)
response = kinesis.subscribe_to_shard(
ConsumerARN=CONSUMER_ARN,
ShardId=SHARD_ID,
StartingPosition={'Type': 'LATEST'}
)
# Procesar el flujo de eventos en tiempo real
event_stream = response['EventStream']
for event in event_stream:
if 'SubscribeToShardEvent' in event:
for record in event['SubscribeToShardEvent']['Records']:
data = json.loads(record['Data'].decode('utf-8'))
print(f"[Enhanced Fan-Out] Device: {data['device_id']}, HR: {data['heart_rate']}")Capítulo 02 – Lección 1 : Amazon OpenSearch — Indexación y Búsqueda en Tiempo Real
NOTEObjetivo — Comprender el funcionamiento de Amazon OpenSearch Service: cómo crear índices, definir mappings y realizar búsquedas y agregaciones en tiempo real sobre datos de streaming.
1. OpenSearch — Conceptos Fundamentales
NOTEOpenSearch — Fork open-source de Elasticsearch (desarrollado por AWS desde 2021). Motor de búsqueda y analítica distribuida basado en Apache Lucene. Amazon OpenSearch Service es la versión gestionada en AWS.
Terminología
| Index | ≈ Tabla de una base de datos |
| Document | ≈ Fila en una tabla (JSON) |
| Field | ≈ Columna en una tabla |
| Shard | Fragmento de un índice (distribución) |
| Replica | Copia de un shard (alta disponibilidad) |
| Mapping | Esquema de los tipos de campos |
Cuándo usar OpenSearch
2. Crear un dominio OpenSearch
# Crear un dominio OpenSearch para datos IoT
aws opensearch create-domain \
--domain-name sante-iot-dashboard \
--engine-version 'OpenSearch_2.11' \
--cluster-config '{
"InstanceType": "t3.small.search",
"InstanceCount": 2,
"DedicatedMasterEnabled": false,
"ZoneAwarenessEnabled": true,
"ZoneAwarenessConfig": {"AvailabilityZoneCount": 2}
}' \
--ebs-options '{
"EBSEnabled": true,
"VolumeType": "gp3",
"VolumeSize": 20,
"Iops": 3000
}' \
--encryption-at-rest-options '{"Enabled": true}' \
--node-to-node-encryption-options '{"Enabled": true}' \
--domain-endpoint-options '{"EnforceHTTPS": true, "TLSSecurityPolicy": "Policy-Min-TLS-1-2-2019-07"}' \
--advanced-security-options '{
"Enabled": true,
"InternalUserDatabaseEnabled": true,
"MasterUserOptions": {
"MasterUserName": "admin",
"MasterUserPassword": "VotreMotDePasseSecurise#123"
}
}'
# Verificar el estado del dominio
aws opensearch describe-domain \
--domain-name sante-iot-dashboard \
--query "DomainStatus.Processing"3. Crear un índice con Mapping
NOTEMapping — El mapping define los tipos de datos de cada campo. OpenSearch puede autodetectar los tipos (dynamic mapping), pero se recomienda definir explícitamente el mapping para los campos críticos a fin de evitar errores de tipado.
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
# Conexión al dominio OpenSearch
region = 'ca-central-1'
service = 'es'
host = 'https://votre-domaine.ca-central-1.es.amazonaws.com'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region, service,
session_token=credentials.token
)
client = OpenSearch(
hosts=[{'host': host.replace('https://', ''), 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
# Crear el índice con un mapping explícito
mapping = {
"settings": {
"number_of_shards": 2, # Distribuir en 2 nodos
"number_of_replicas": 1 # 1 copia para HA
},
"mappings": {
"properties": {
"device_id": {"type": "keyword"}, # Valor exacto (no analizado)
"patient_id": {"type": "keyword"},
"timestamp": {"type": "date", "format": "epoch_second"},
"heart_rate": {"type": "integer"},
"spo2": {"type": "float"},
"temperature": {"type": "float"},
"steps": {"type": "integer"},
"severite": {"type": "keyword"},
"region": {"type": "keyword"},
"processed_at":{"type": "date"},
# Campo de texto analizado para búsqueda full-text
"notes": {"type": "text", "analyzer": "french"}
}
}
}
response = client.indices.create(index='donnees-cardiaques', body=mapping)
print(f"Index créé : {response['acknowledged']}")
# Verificar el mapping
mapping_info = client.indices.get_mapping(index='donnees-cardiaques')
print(f"Mapping : {list(mapping_info['donnees-cardiaques']['mappings']['properties'].keys())}")4. Indexar documentos
import time, json, random
# Indexar un documento individual
document = {
"device_id": "WATCH-042",
"patient_id": "PATIENT-42",
"timestamp": int(time.time()),
"heart_rate": 88,
"spo2": 97.5,
"temperature": 37.1,
"steps": 12453,
"severite": "NORMAL",
"region": "ca-central-1"
}
response = client.index(
index='donnees-cardiaques',
body=document,
id=f"{document['device_id']}-{document['timestamp']}", # ID único
refresh=True # Hace el documento inmediatamente buscable
)
print(f"Document indexé : {response['result']}")
# Indexación en bulk (más eficiente para grandes volúmenes)
from opensearchpy.helpers import bulk
def generer_donnees(n_montres=100):
"""Generar n registros de relojes cardíacos"""
actions = []
for i in range(n_montres):
heart_rate = random.randint(55, 115)
spo2 = random.uniform(93, 100)
doc = {
"_index": "donnees-cardiaques",
"_id": f"WATCH-{i:03d}-{int(time.time())}",
"_source": {
"device_id": f"WATCH-{i:03d}",
"patient_id": f"PATIENT-{i}",
"timestamp": int(time.time()),
"heart_rate": heart_rate,
"spo2": round(spo2, 1),
"temperature": round(random.uniform(36.0, 37.8), 1),
"steps": random.randint(0, 20000),
"severite": "CRITIQUE" if heart_rate > 140 else "ATTENTION" if heart_rate > 110 else "NORMAL",
"region": "ca-central-1"
}
}
actions.append(doc)
return actions
# Indexación bulk
success, errors = bulk(client, generer_donnees(100))
print(f"Indexé : {success} documents, {len(errors)} erreurs")5. Búsquedas y agregaciones
Búsqueda simple (Match Query)
# Encontrar todos los registros CRÍTICOS
query_critiques = {
"query": {
"term": {"severite": "CRITIQUE"}
},
"sort": [{"timestamp": {"order": "desc"}}],
"size": 20
}
response = client.search(index='donnees-cardiaques', body=query_critiques)
print(f"Enregistrements critiques : {response['hits']['total']['value']}")
for hit in response['hits']['hits']:
print(f" Device: {hit['_source']['device_id']}, HR: {hit['_source']['heart_rate']}")Búsqueda con filtro de rango (Range Query)
# Encontrar anomalías cardíacas de los últimos 30 minutos
now = int(time.time())
thirty_min_ago = now - 1800
query_anomalies = {
"query": {
"bool": {
"must": [
{
"range": {
"timestamp": {
"gte": thirty_min_ago,
"lte": now
}
}
}
],
"should": [
{"range": {"heart_rate": {"gt": 120}}}, # Taquicardia
{"range": {"heart_rate": {"lt": 45}}}, # Bradiarritmia
{"range": {"spo2": {"lt": 90}}} # Hipoxemia
],
"minimum_should_match": 1
}
},
"sort": [{"timestamp": {"order": "desc"}}]
}
response = client.search(index='donnees-cardiaques', body=query_anomalies)
print(f"Anomalies détectées : {response['hits']['total']['value']}")Agregaciones — Estadísticas en tiempo real
# Agregación : estadísticas cardíacas por dispositivo (últimas 24h)
query_stats = {
"query": {
"range": {
"timestamp": {"gte": "now-24h/h", "lte": "now"}
}
},
"aggs": {
"par_device": {
"terms": {"field": "device_id", "size": 50},
"aggs": {
"fc_moyenne": {"avg": {"field": "heart_rate"}},
"fc_max": {"max": {"field": "heart_rate"}},
"fc_min": {"min": {"field": "heart_rate"}},
"spo2_moyenne":{"avg": {"field": "spo2"}},
"nb_critiques":{
"filter": {"term": {"severite": "CRITIQUE"}}
}
}
},
"par_severite": {
"terms": {"field": "severite"},
"aggs": {
"nb_patients": {"cardinality": {"field": "patient_id"}}
}
},
"tendance_horaire": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "1h",
"format": "yyyy-MM-dd HH:mm"
},
"aggs": {
"fc_moyenne": {"avg": {"field": "heart_rate"}}
}
}
},
"size": 0 # No devolver más que las agregaciones, no los documentos
}
response = client.search(index='donnees-cardiaques', body=query_stats)
buckets = response['aggregations']['par_severite']['buckets']
for b in buckets:
print(f"Sévérité {b['key']}: {b['doc_count']} lectures, {b['nb_patients']['value']} patients")Capítulo 02 – Lección 2 : Amazon Cognito — Autenticación para OpenSearch Dashboards
NOTEObjetivo — Configurar Amazon Cognito para securizar el acceso a OpenSearch Dashboards, crear roles distintos para diferentes tipos de usuarios (administradores, data scientists, operadores de monitorización) y aplicar un control de acceso preciso a los índices.
1. ¿Por qué Cognito para OpenSearch?
NOTEProblema — Por defecto, OpenSearch Dashboards utiliza una autenticación HTTP básica (nombre de usuario/contraseña). No es adecuada para una organización con decenas de usuarios, perfiles diferentes y necesidades de autenticación SSO.
Sin Cognito
Con Cognito
2. Arquitectura Cognito + OpenSearch
# Arquitectura de autenticación # USUARIO # ↓ (1) Accede a OpenSearch Dashboards # AMAZON COGNITO USER POOL # ↓ (2) Verifica las credenciales (o SSO vía IdP) # ↓ (3) Emite un token JWT # AMAZON COGNITO IDENTITY POOL # ↓ (4) Intercambia el JWT por credenciales AWS temporales (STS) # ↓ (5) Asigna un rol IAM según el grupo Cognito # AMAZON OPENSEARCH SERVICE # ↓ (6) Verifica los permisos Fine-Grained Access Control (FGAC) # DATOS DEL ÍNDICE
3. Crear el User Pool Cognito
# Paso 1 : Crear el User Pool
aws cognito-idp create-user-pool \
--pool-name opensearch-sante-users \
--policies '{
"PasswordPolicy": {
"MinimumLength": 12,
"RequireUppercase": true,
"RequireLowercase": true,
"RequireNumbers": true,
"RequireSymbols": true
}
}' \
--auto-verified-attributes email \
--mfa-configuration OPTIONAL \
--sms-configuration '{"SnsCallerArn": "arn:aws:iam::123456789012:role/CognitoSNSRole", "ExternalId": "extId"}' \
--query "UserPool.Id" --output text
# → us-east-1_XXXXXXXXX (anote este ID)
# Paso 2 : Crear el App Client (sin secret para Dashboards)
aws cognito-idp create-user-pool-client \
--user-pool-id us-east-1_XXXXXXXXX \
--client-name opensearch-dashboards-client \
--no-generate-secret \
--explicit-auth-flows ALLOW_USER_PASSWORD_AUTH ALLOW_REFRESH_TOKEN_AUTH \
--supported-identity-providers COGNITO \
--query "UserPoolClient.ClientId" --output text
# → YYYYYYYYYYYYYYYYYY (anote este ID)Crear los grupos de usuarios
# Grupo 1 : Administradores OpenSearch (acceso total)
aws cognito-idp create-group \
--user-pool-id us-east-1_XXXXXXXXX \
--group-name opensearch-admins \
--description "Accès administrateur complet à OpenSearch" \
--role-arn arn:aws:iam::123456789012:role/OpenSearch-AdminRole
# Grupo 2 : Data Scientists (solo lectura en todos los índices)
aws cognito-idp create-group \
--user-pool-id us-east-1_XXXXXXXXX \
--group-name opensearch-scientists \
--description "Lecture seule pour analyse" \
--role-arn arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole
# Grupo 3 : Operadores de monitorización (lectura solo en el índice de monitorización)
aws cognito-idp create-group \
--user-pool-id us-east-1_XXXXXXXXX \
--group-name opensearch-operators \
--description "Monitoring opérationnel uniquement" \
--role-arn arn:aws:iam::123456789012:role/OpenSearch-OperatorRole
# Crear un usuario y añadirlo a un grupo
aws cognito-idp admin-create-user \
--user-pool-id us-east-1_XXXXXXXXX \
--username dr.dupont@hopital.ca \
--user-attributes Name=email,Value=dr.dupont@hopital.ca Name=email_verified,Value=true \
--temporary-password TempPassword123!
aws cognito-idp admin-add-user-to-group \
--user-pool-id us-east-1_XXXXXXXXX \
--username dr.dupont@hopital.ca \
--group-name opensearch-scientists4. Crear el Identity Pool
# Crear el Identity Pool (enlaza el User Pool con los roles IAM)
aws cognito-identity create-identity-pool \
--identity-pool-name opensearch-sante-identity \
--allow-unauthenticated-identities false \
--cognito-identity-providers '[
{
"ProviderName": "cognito-idp.ca-central-1.amazonaws.com/ca-central-1_XXXXXXXXX",
"ClientId": "YYYYYYYYYYYYYYYYYY",
"ServerSideTokenCheck": true
}
]' \
--query "IdentityPoolId" --output text
# → ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ
# Asociar los roles IAM al Identity Pool
aws cognito-identity set-identity-pool-roles \
--identity-pool-id "ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ" \
--roles authenticated=arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole \
--role-mappings '{
"cognito-idp.ca-central-1.amazonaws.com/ca-central-1_XXXXXXXXX:YYYYYYYYYYYYYYYYYY": {
"Type": "Rules",
"AmbiguousRoleResolution": "AuthenticatedRole",
"RulesConfiguration": {
"Rules": [
{
"Claim": "cognito:groups",
"MatchType": "Contains",
"Value": "opensearch-admins",
"RoleARN": "arn:aws:iam::123456789012:role/OpenSearch-AdminRole"
},
{
"Claim": "cognito:groups",
"MatchType": "Contains",
"Value": "opensearch-operators",
"RoleARN": "arn:aws:iam::123456789012:role/OpenSearch-OperatorRole"
}
]
}
}
}'5. Fine-Grained Access Control en OpenSearch
NOTEFine-Grained Access Control (FGAC) — OpenSearch permite controlar el acceso a nivel de índice, tipo de documento e incluso campos individuales. Esto permite crear roles personalizados que solo exponen los datos necesarios.
# Configurar los roles OpenSearch vía la API REST
import requests, json
host = 'https://votre-domaine.ca-central-1.es.amazonaws.com'
auth = ('admin', 'VotreMotDePasseSecurise#123')
# Rol 1 : Solo lectura en el índice donnees-cardiaques
roles_payload = {
"cluster_permissions": ["cluster:monitor/main"],
"index_permissions": [
{
"index_patterns": ["donnees-cardiaques*"],
"allowed_actions": [
"read",
"indices:data/read/search",
"indices:data/read/msearch",
"indices:admin/mappings/get"
]
}
],
"tenant_permissions": [
{
"tenant_patterns": ["global_tenant"],
"allowed_actions": ["kibana_all_read"]
}
]
}
# Crear el rol "data-scientist-reader"
requests.put(
f"{host}/_plugins/_security/api/roles/data-scientist-reader",
json=roles_payload, auth=auth
)
# Mapear el rol IAM Cognito al rol OpenSearch
role_mapping = {
"backend_roles": [
"arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole"
],
"description": "Data Scientists ont accès en lecture aux données cardiaques"
}
requests.put(
f"{host}/_plugins/_security/api/rolesmapping/data-scientist-reader",
json=role_mapping, auth=auth
)
print("Rôle et mapping créés avec succès")6. Activar Cognito en OpenSearch
# Activar la autenticación Cognito en el dominio OpenSearch
aws opensearch update-domain-config \
--domain-name sante-iot-dashboard \
--cognito-options '{
"Enabled": true,
"UserPoolId": "ca-central-1_XXXXXXXXX",
"IdentityPoolId": "ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ",
"RoleArn": "arn:aws:iam::123456789012:role/CognitoAccessForAmazonOpenSearch"
}'
# Esperar a que la actualización termine
aws opensearch describe-domain \
--domain-name sante-iot-dashboard \
--query "DomainStatus.Processing"
# Esperar a que el valor sea "false"
# Obtener la URL de los Dashboards
aws opensearch describe-domain \
--domain-name sante-iot-dashboard \
--query "DomainStatus.Endpoints"va-plus-loin
Este artículo cubre los extractos más útiles — el curso completo AWS Datos en Tiempo Real (6 capítulos, 14 lecciones, ejercicios corregidos y proyecto final) te lleva hasta el final.
./acceder-au-cours-complet curso gratuito : Dominar Claude CodeFAQ
¿Cuánto tiempo se necesita para aprender AWS Datos en Tiempo Real?
Con una progresión estructurada (6 capítulos, 14 lecciones cortas y prácticas), se alcanza un nivel operativo en unas semanas a razón de 30 a 60 minutos al día. Lo importante es practicar cada noción inmediatamente.
¿Se necesitan requisitos previos?
Unos conocimientos básicos de informática bastan. Si sabes usar un terminal y leer código sencillo, estás listo.
¿Por dónde empezar concretamente?
Reproduce los comandos de este artículo y sigue el curso completo AWS Datos en Tiempo Real: encadena las 14 lecciones en orden, con ejercicios y proyecto final.
./a-lire-aussi
→ AWS Data Engineering Bootcamp explicado de forma sencilla (con diagramas y código real)→ Python Data Science : los 9 pasos clave para pasar de cero a operativo→ Python NumPy en la práctica : el código y los comandos que realmente importan📬 ¿Quieres recibir este tipo de guía cada semana? Suscríbete gratis — código real, cero palabrería.