Lance-se em AWS Dados em Tempo Real: seu primeiro passo concreto hoje
AWS Dados em Tempo Real: o essencial em um artigo — código real, diagramas e etapas concretas, extratos de um curso de 14 lições.
A melhor forma de aprender AWS Dados em Tempo Real é fazendo. Este artigo te dá o pontapé inicial com trechos práticos extraídos de um curso de 14 lições — o suficiente para obter um primeiro resultado já hoje.
tl;dr
- Introdução ao Streaming de Dados AWS
- Amazon Kinesis
- Amazon OpenSearch
- Segurança e Criptografia
- Kafka vs Kinesis e Alternativas
~$ cat ./parcours.md # AWS Dados em Tempo Real — 6 capítulos
01
Introdução ao Streaming de Dados AWS
→ Capítulo 00 – Lição 1 : Dados em Tempo Real vs Processamento em Lotes (Batch)→ Capítulo 00 – Lição 2 : O Ecossistema AWS para o Streaming de Dados
02
Amazon Kinesis
→ Capítulo 01 – Lição 1 : Amazon Kinesis Data Streams — Shards e Consumidores→ Capítulo 01 – Lição 2 : Kinesis Firehose — Entrega Gerenciada e Transformação+ 1 mais lições
03
Amazon OpenSearch
→ Capítulo 02 – Lição 1 : Amazon OpenSearch — Indexação e Pesquisa em Tempo Real→ Capítulo 02 – Lição 2 : Amazon Cognito — Autenticação para OpenSearch Dashboards+ 1 mais lições
04
Segurança e Criptografia
→ Capítulo 03 – Lição 1 : AWS KMS — Criptografia dos Dados no Kinesis e Firehose→ Capítulo 03 – Lição 2 : IAM, VPC e Boas Práticas de Segurança para o Streaming
05
Kafka vs Kinesis e Alternativas
→ Capítulo 04 – Lição 1 : Apache Kafka vs Amazon Kinesis — Comparação Completa→ Capítulo 04 – Lição 2 : Migração Kafka para Kinesis / MSK e Padrões Híbridos
06
Arquiteturas de Referência
→ Capítulo 05 – Lição 1 : Arquitetura de Referência — IoT Wearables Médicos→ Capítulo 05 – Lição 2 : Arquitetura de Referência — Detecção de Fraude E-Commerce em Tempo Real
🏁
Projeto final
→ Você sai com um projeto concreto e demonstrável
Capítulo 01 – Lição 1 : Amazon Kinesis Data Streams — Shards e Consumidores
NOTEObjetivo — Compreender a arquitetura interna do Kinesis Data Streams: como os shards permitem o paralelismo, como os produtores publicam dados e como os consumidores os processam — com e sem Enhanced Fan-Out.
1. Arquitetura dos Shards — A Unidade Básica
NOTEShard — Um shard é uma unidade de capacidade isolada em um fluxo Kinesis. Cada shard pode ingerir até 1 MB/s ou 1 000 registros/s na entrada e fornecer até 2 MB/s na saída (por consumidor).
# Capacité par shard
# ─────────────────────────────────────
# Écriture (PUT) : 1 MB/s OU 1 000 records/s (la limite la plus basse s'applique)
# Lecture (GET) : 2 MB/s par GetRecords() - partagée entre TOUS les consommateurs
#
# → Pour 5 Mo/s d'ingestion : vous avez besoin de 5 shards
# → Pour 10 000 records/s : vous avez besoin de 10 shards
# Calculer le nombre de shards nécessaires
max_ingestion_rate = 5 # MB/s
max_record_rate = 3000 # records/s
shards_pour_volume = max_ingestion_rate / 1 # = 5 shards
shards_pour_taux = max_record_rate / 1000 # = 3 shards
nb_shards = max(shards_pour_volume, shards_pour_taux)
print(f"Shards nécessaires : {int(nb_shards)}") # → 5 shardsPartition Key — Como os Dados são Distribuídos
NOTEPartition Key — Cada registro possui uma chave de partição. O Kinesis aplica uma função de hash MD5 nessa chave para determinar em qual shard o registro é roteado. Registros com a mesma chave de partição sempre vão para o mesmo shard (ordem garantida).
# Exemples de Partition Keys et leur impact
# ✅ BON : clé à haute cardinalité → distribution uniforme entre shards
# device_id = "WATCH-001", "WATCH-002", ..., "WATCH-10000"
kinesis.put_record(
StreamName='donnees-sante',
Data=json.dumps(payload),
PartitionKey=payload['device_id'] # Des milliers de valeurs uniques
)
# ⚠️ PROBLÈME : clé à faible cardinalité → "hot shard" (un shard surchargé)
# Si tous les enregistrements utilisent la même clé :
kinesis.put_record(
StreamName='donnees-sante',
Data=json.dumps(payload),
PartitionKey='donnees-sante' # ← Toujours le même shard ! Goulot d'étranglement
)
# ✅ SOLUTION si pas de clé naturelle : clé aléatoire
import uuid
kinesis.put_record(
StreamName='donnees-sante',
Data=json.dumps(payload),
PartitionKey=str(uuid.uuid4()) # Aléatoire = distribution parfaite
# Attention : perte de l'ordre dans le shard !
)2. Produtores — Enviar Dados
put_record vs put_records
import boto3, json, time
kinesis = boto3.client('kinesis', region_name='ca-central-1')
# Méthode 1 : put_record — un seul enregistrement
# Latence : ~70ms par appel
donnee = {
"device_id": "WATCH-001",
"heart_rate": 72,
"timestamp": int(time.time())
}
response = kinesis.put_record(
StreamName='donnees-sante',
Data=json.dumps(donnee).encode('utf-8'),
PartitionKey=donnee['device_id']
)
print(f"ShardId: {response['ShardId']}")
# ─────────────────────────────────────
# Méthode 2 : put_records — plusieurs enregistrements en batch (RECOMMANDÉ)
# Latence : ~70ms pour jusqu'à 500 enregistrements !
# Limite : max 500 enregistrements ou 5MB par appel
donnees = [
{"device_id": f"WATCH-{i:03d}", "heart_rate": 60 + i, "timestamp": int(time.time())}
for i in range(10)
]
records = [
{
'Data': json.dumps(d).encode('utf-8'),
'PartitionKey': d['device_id']
}
for d in donnees
]
response = kinesis.put_records(
StreamName='donnees-sante',
Records=records
)
print(f"Enregistrements OK: {response['Records'].__len__()} envoyés")
print(f"Échecs: {response['FailedRecordCount']}")
# Gérer les échecs partiels (put_records peut avoir des succès partiels)
failed_records = [
records[i] for i, r in enumerate(response['Records'])
if 'ErrorCode' in r
]
if failed_records:
print(f"Retry nécessaire pour {len(failed_records)} enregistrements")3. Consumidores — Ler os Dados
Modo Padrão (GetRecords)
NOTEGetRecords Standard — Todos os consumidores compartilham o limite de 2 MB/s por shard. Se você tiver 3 consumidores em 1 shard, cada um obtém apenas ~667 KB/s.
# Consommateur standard avec GetRecords
import boto3, json, time
kinesis = boto3.client('kinesis', region_name='ca-central-1')
STREAM_NAME = 'donnees-sante'
# 1. Lister les shards
response = kinesis.describe_stream(StreamName=STREAM_NAME)
shards = response['StreamDescription']['Shards']
for shard in shards:
shard_id = shard['ShardId']
# 2. Obtenir un itérateur de shard (LATEST = nouvelles données seulement)
iterator_response = kinesis.get_shard_iterator(
StreamName=STREAM_NAME,
ShardId=shard_id,
ShardIteratorType='LATEST' # Ou 'TRIM_HORIZON' pour lire depuis le début
)
shard_iterator = iterator_response['ShardIterator']
# 3. Boucle de lecture
while True:
records_response = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100 # Max 100 enregistrements par appel (ou 10MB)
)
for record in records_response['Records']:
data = json.loads(record['Data'].decode('utf-8'))
print(f"Device: {data['device_id']}, HR: {data['heart_rate']}")
shard_iterator = records_response['NextShardIterator']
# Limitation : max 5 appels GetRecords/s par shard
if not records_response['Records']:
time.sleep(1) # Attendre si pas de nouvelles donnéesEnhanced Fan-Out — Consumidores Dedicados
TIPEnhanced Fan-Out — Cada consumidor registrado obtém sua própria capacidade de 2 MB/s por shard (em vez de compartilhar). Usa HTTP/2 push em vez de polling. Ideal para aplicações críticas que exigem baixa latência (<70ms).
# Enregistrer un consommateur Enhanced Fan-Out
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante \
--consumer-name alertes-medicales-temps-reel
# Lister les consommateurs enregistrés
aws kinesis list-stream-consumers \
--stream-arn arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante# Consommateur avec Enhanced Fan-Out (SubscribeToShard)
import boto3, json
kinesis = boto3.client('kinesis', region_name='ca-central-1')
STREAM_ARN = 'arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante'
CONSUMER_ARN = 'arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante/consumer/alertes-medicales:1234567890'
SHARD_ID = 'shardId-000000000000'
# SubscribeToShard avec HTTP/2 streaming (push au lieu de polling)
response = kinesis.subscribe_to_shard(
ConsumerARN=CONSUMER_ARN,
ShardId=SHARD_ID,
StartingPosition={'Type': 'LATEST'}
)
# Traiter le flux d'événements en temps réel
event_stream = response['EventStream']
for event in event_stream:
if 'SubscribeToShardEvent' in event:
for record in event['SubscribeToShardEvent']['Records']:
data = json.loads(record['Data'].decode('utf-8'))
print(f"[Enhanced Fan-Out] Device: {data['device_id']}, HR: {data['heart_rate']}")Capítulo 02 – Lição 1 : Amazon OpenSearch — Indexação e Pesquisa em Tempo Real
NOTEObjetivo — Compreender o funcionamento do Amazon OpenSearch Service: como criar índices, definir mapeamentos e realizar pesquisas e agregações em tempo real em dados de streaming.
1. OpenSearch — Conceitos Fundamentais
NOTEOpenSearch — Fork open-source do Elasticsearch (desenvolvido pela AWS desde 2021). Motor de pesquisa e analytics distribuído baseado no Apache Lucene. O Amazon OpenSearch Service é a versão gerenciada na AWS.
Terminologia
| Index | ≈ Tabela de um banco de dados |
| Document | ≈ Linha em uma tabela (JSON) |
| Field | ≈ Coluna em uma tabela |
| Shard | Fragmento de um índice (distribuição) |
| Replica | Cópia de um shard (alta disponibilidade) |
| Mapping | Esquema dos tipos de campos |
Quando Usar OpenSearch
2. Criar um Domínio OpenSearch
# Créer un domaine OpenSearch pour données IoT
aws opensearch create-domain \
--domain-name sante-iot-dashboard \
--engine-version 'OpenSearch_2.11' \
--cluster-config '{
"InstanceType": "t3.small.search",
"InstanceCount": 2,
"DedicatedMasterEnabled": false,
"ZoneAwarenessEnabled": true,
"ZoneAwarenessConfig": {"AvailabilityZoneCount": 2}
}' \
--ebs-options '{
"EBSEnabled": true,
"VolumeType": "gp3",
"VolumeSize": 20,
"Iops": 3000
}' \
--encryption-at-rest-options '{"Enabled": true}' \
--node-to-node-encryption-options '{"Enabled": true}' \
--domain-endpoint-options '{"EnforceHTTPS": true, "TLSSecurityPolicy": "Policy-Min-TLS-1-2-2019-07"}' \
--advanced-security-options '{
"Enabled": true,
"InternalUserDatabaseEnabled": true,
"MasterUserOptions": {
"MasterUserName": "admin",
"MasterUserPassword": "VotreMotDePasseSecurise#123"
}
}'
# Vérifier le statut du domaine
aws opensearch describe-domain \
--domain-name sante-iot-dashboard \
--query "DomainStatus.Processing"3. Criar um Índice com Mapeamento
NOTEMapping — O mapeamento define os tipos de dados de cada campo. O OpenSearch pode detectar automaticamente os tipos (dynamic mapping), mas é recomendado definir explicitamente o mapeamento para os campos críticos a fim de evitar erros de tipagem.
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
# Connexion au domaine OpenSearch
region = 'ca-central-1'
service = 'es'
host = 'https://votre-domaine.ca-central-1.es.amazonaws.com'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region, service,
session_token=credentials.token
)
client = OpenSearch(
hosts=[{'host': host.replace('https://', ''), 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
# Créer l'index avec un mapping explicite
mapping = {
"settings": {
"number_of_shards": 2, # Distribuer sur 2 nœuds
"number_of_replicas": 1 # 1 copie pour la HA
},
"mappings": {
"properties": {
"device_id": {"type": "keyword"}, # Valeur exacte (non analysée)
"patient_id": {"type": "keyword"},
"timestamp": {"type": "date", "format": "epoch_second"},
"heart_rate": {"type": "integer"},
"spo2": {"type": "float"},
"temperature": {"type": "float"},
"steps": {"type": "integer"},
"severite": {"type": "keyword"},
"region": {"type": "keyword"},
"processed_at":{"type": "date"},
# Champ texte analysé pour la recherche full-text
"notes": {"type": "text", "analyzer": "french"}
}
}
}
response = client.indices.create(index='donnees-cardiaques', body=mapping)
print(f"Index créé : {response['acknowledged']}")
# Vérifier le mapping
mapping_info = client.indices.get_mapping(index='donnees-cardiaques')
print(f"Mapping : {list(mapping_info['donnees-cardiaques']['mappings']['properties'].keys())}")4. Indexar Documentos
import time, json, random
# Indexer un document individuel
document = {
"device_id": "WATCH-042",
"patient_id": "PATIENT-42",
"timestamp": int(time.time()),
"heart_rate": 88,
"spo2": 97.5,
"temperature": 37.1,
"steps": 12453,
"severite": "NORMAL",
"region": "ca-central-1"
}
response = client.index(
index='donnees-cardiaques',
body=document,
id=f"{document['device_id']}-{document['timestamp']}", # ID unique
refresh=True # Rend le document immédiatement recherchable
)
print(f"Document indexé : {response['result']}")
# Indexation en bulk (plus efficace pour de grands volumes)
from opensearchpy.helpers import bulk
def generer_donnees(n_montres=100):
"""Générer n enregistrements de montres cardiaques"""
actions = []
for i in range(n_montres):
heart_rate = random.randint(55, 115)
spo2 = random.uniform(93, 100)
doc = {
"_index": "donnees-cardiaques",
"_id": f"WATCH-{i:03d}-{int(time.time())}",
"_source": {
"device_id": f"WATCH-{i:03d}",
"patient_id": f"PATIENT-{i}",
"timestamp": int(time.time()),
"heart_rate": heart_rate,
"spo2": round(spo2, 1),
"temperature": round(random.uniform(36.0, 37.8), 1),
"steps": random.randint(0, 20000),
"severite": "CRITIQUE" if heart_rate > 140 else "ATTENTION" if heart_rate > 110 else "NORMAL",
"region": "ca-central-1"
}
}
actions.append(doc)
return actions
# Indexation bulk
success, errors = bulk(client, generer_donnees(100))
print(f"Indexé : {success} documents, {len(errors)} erreurs")5. Pesquisas e Agregações
Pesquisa Simples (Match Query)
# Trouver tous les enregistrements CRITIQUES
query_critiques = {
"query": {
"term": {"severite": "CRITIQUE"}
},
"sort": [{"timestamp": {"order": "desc"}}],
"size": 20
}
response = client.search(index='donnees-cardiaques', body=query_critiques)
print(f"Enregistrements critiques : {response['hits']['total']['value']}")
for hit in response['hits']['hits']:
print(f" Device: {hit['_source']['device_id']}, HR: {hit['_source']['heart_rate']}")Pesquisa com Filtro de Intervalo (Range Query)
# Trouver les anomalies cardiaques des 30 dernières minutes
now = int(time.time())
thirty_min_ago = now - 1800
query_anomalies = {
"query": {
"bool": {
"must": [
{
"range": {
"timestamp": {
"gte": thirty_min_ago,
"lte": now
}
}
}
],
"should": [
{"range": {"heart_rate": {"gt": 120}}}, # Tachycardie
{"range": {"heart_rate": {"lt": 45}}}, # Bradycardie
{"range": {"spo2": {"lt": 90}}} # Hypoxémie
],
"minimum_should_match": 1
}
},
"sort": [{"timestamp": {"order": "desc"}}]
}
response = client.search(index='donnees-cardiaques', body=query_anomalies)
print(f"Anomalies détectées : {response['hits']['total']['value']}")Agregações — Estatísticas em Tempo Real
# Agrégation : statistiques cardiaques par device (dernières 24h)
query_stats = {
"query": {
"range": {
"timestamp": {"gte": "now-24h/h", "lte": "now"}
}
},
"aggs": {
"par_device": {
"terms": {"field": "device_id", "size": 50},
"aggs": {
"fc_moyenne": {"avg": {"field": "heart_rate"}},
"fc_max": {"max": {"field": "heart_rate"}},
"fc_min": {"min": {"field": "heart_rate"}},
"spo2_moyenne":{"avg": {"field": "spo2"}},
"nb_critiques":{
"filter": {"term": {"severite": "CRITIQUE"}}
}
}
},
"par_severite": {
"terms": {"field": "severite"},
"aggs": {
"nb_patients": {"cardinality": {"field": "patient_id"}}
}
},
"tendance_horaire": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "1h",
"format": "yyyy-MM-dd HH:mm"
},
"aggs": {
"fc_moyenne": {"avg": {"field": "heart_rate"}}
}
}
},
"size": 0 # Ne retourner que les agrégations, pas les documents
}
response = client.search(index='donnees-cardiaques', body=query_stats)
buckets = response['aggregations']['par_severite']['buckets']
for b in buckets:
print(f"Sévérité {b['key']}: {b['doc_count']} lectures, {b['nb_patients']['value']} patients")Capítulo 02 – Lição 2 : Amazon Cognito — Autenticação para OpenSearch Dashboards
NOTEObjetivo — Configurar o Amazon Cognito para proteger o acesso aos OpenSearch Dashboards, criar papéis distintos para diferentes tipos de usuários (administradores, cientistas de dados, operadores de monitoramento) e aplicar controle de acesso granular aos índices.
1. Por que Cognito para OpenSearch?
NOTEProblema — Por padrão, o OpenSearch Dashboards usa autenticação HTTP básica (nome de usuário/senha). Isso não é adequado para uma organização com dezenas de usuários, perfis diferentes e necessidades de autenticação SSO.
Sem Cognito
Com Cognito
2. Arquitetura Cognito + OpenSearch
# Architecture d'authentification # UTILISATEUR # ↓ (1) Accède à OpenSearch Dashboards # AMAZON COGNITO USER POOL # ↓ (2) Vérifie les credentials (ou SSO via IdP) # ↓ (3) Émet un token JWT # AMAZON COGNITO IDENTITY POOL # ↓ (4) Échange le JWT contre des credentials AWS temporaires (STS) # ↓ (5) Assigne un rôle IAM selon le groupe Cognito # AMAZON OPENSEARCH SERVICE # ↓ (6) Vérifie les permissions Fine-Grained Access Control (FGAC) # DONNÉES DE L'INDEX
3. Criar o User Pool Cognito
# Étape 1 : Créer le User Pool
aws cognito-idp create-user-pool \
--pool-name opensearch-sante-users \
--policies '{
"PasswordPolicy": {
"MinimumLength": 12,
"RequireUppercase": true,
"RequireLowercase": true,
"RequireNumbers": true,
"RequireSymbols": true
}
}' \
--auto-verified-attributes email \
--mfa-configuration OPTIONAL \
--sms-configuration '{"SnsCallerArn": "arn:aws:iam::123456789012:role/CognitoSNSRole", "ExternalId": "extId"}' \
--query "UserPool.Id" --output text
# → us-east-1_XXXXXXXXX (notez cet ID)
# Étape 2 : Créer le App Client (sans secret pour Dashboards)
aws cognito-idp create-user-pool-client \
--user-pool-id us-east-1_XXXXXXXXX \
--client-name opensearch-dashboards-client \
--no-generate-secret \
--explicit-auth-flows ALLOW_USER_PASSWORD_AUTH ALLOW_REFRESH_TOKEN_AUTH \
--supported-identity-providers COGNITO \
--query "UserPoolClient.ClientId" --output text
# → YYYYYYYYYYYYYYYYYY (notez cet ID)Criar os Grupos de Usuários
# Groupe 1 : Administrateurs OpenSearch (accès total)
aws cognito-idp create-group \
--user-pool-id us-east-1_XXXXXXXXX \
--group-name opensearch-admins \
--description "Accès administrateur complet à OpenSearch" \
--role-arn arn:aws:iam::123456789012:role/OpenSearch-AdminRole
# Groupe 2 : Data Scientists (lecture seule sur tous les index)
aws cognito-idp create-group \
--user-pool-id us-east-1_XXXXXXXXX \
--group-name opensearch-scientists \
--description "Lecture seule pour analyse" \
--role-arn arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole
# Groupe 3 : Opérateurs de monitoring (lecture sur l'index monitoring seulement)
aws cognito-idp create-group \
--user-pool-id us-east-1_XXXXXXXXX \
--group-name opensearch-operators \
--description "Monitoring opérationnel uniquement" \
--role-arn arn:aws:iam::123456789012:role/OpenSearch-OperatorRole
# Créer un utilisateur et l'ajouter à un groupe
aws cognito-idp admin-create-user \
--user-pool-id us-east-1_XXXXXXXXX \
--username dr.dupont@hopital.ca \
--user-attributes Name=email,Value=dr.dupont@hopital.ca Name=email_verified,Value=true \
--temporary-password TempPassword123!
aws cognito-idp admin-add-user-to-group \
--user-pool-id us-east-1_XXXXXXXXX \
--username dr.dupont@hopital.ca \
--group-name opensearch-scientists4. Criar o Identity Pool
# Créer l'Identity Pool (lie le User Pool aux rôles IAM)
aws cognito-identity create-identity-pool \
--identity-pool-name opensearch-sante-identity \
--allow-unauthenticated-identities false \
--cognito-identity-providers '[
{
"ProviderName": "cognito-idp.ca-central-1.amazonaws.com/ca-central-1_XXXXXXXXX",
"ClientId": "YYYYYYYYYYYYYYYYYY",
"ServerSideTokenCheck": true
}
]' \
--query "IdentityPoolId" --output text
# → ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ
# Associer les rôles IAM à l'Identity Pool
aws cognito-identity set-identity-pool-roles \
--identity-pool-id "ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ" \
--roles authenticated=arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole \
--role-mappings '{
"cognito-idp.ca-central-1.amazonaws.com/ca-central-1_XXXXXXXXX:YYYYYYYYYYYYYYYYYY": {
"Type": "Rules",
"AmbiguousRoleResolution": "AuthenticatedRole",
"RulesConfiguration": {
"Rules": [
{
"Claim": "cognito:groups",
"MatchType": "Contains",
"Value": "opensearch-admins",
"RoleARN": "arn:aws:iam::123456789012:role/OpenSearch-AdminRole"
},
{
"Claim": "cognito:groups",
"MatchType": "Contains",
"Value": "opensearch-operators",
"RoleARN": "arn:aws:iam::123456789012:role/OpenSearch-OperatorRole"
}
]
}
}
}'5. Controle de Acesso Granular no OpenSearch
NOTEFine-Grained Access Control (FGAC) — O OpenSearch permite controlar o acesso no nível do índice, do tipo de documento e até de campos individuais. Isso permite criar papéis personalizados que expõem apenas os dados necessários.
# Configurer les rôles OpenSearch via l'API REST
import requests, json
host = 'https://votre-domaine.ca-central-1.es.amazonaws.com'
auth = ('admin', 'VotreMotDePasseSecurise#123')
# Rôle 1 : Lecture seule sur l'index données-cardiaques
roles_payload = {
"cluster_permissions": ["cluster:monitor/main"],
"index_permissions": [
{
"index_patterns": ["donnees-cardiaques*"],
"allowed_actions": [
"read",
"indices:data/read/search",
"indices:data/read/msearch",
"indices:admin/mappings/get"
]
}
],
"tenant_permissions": [
{
"tenant_patterns": ["global_tenant"],
"allowed_actions": ["kibana_all_read"]
}
]
}
# Créer le rôle "data-scientist-reader"
requests.put(
f"{host}/_plugins/_security/api/roles/data-scientist-reader",
json=roles_payload, auth=auth
)
# Mapper le rôle IAM Cognito vers le rôle OpenSearch
role_mapping = {
"backend_roles": [
"arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole"
],
"description": "Data Scientists ont accès en lecture aux données cardiaques"
}
requests.put(
f"{host}/_plugins/_security/api/rolesmapping/data-scientist-reader",
json=role_mapping, auth=auth
)
print("Rôle et mapping créés avec succès")6. Ativar Cognito no OpenSearch
# Activer l'authentification Cognito sur le domaine OpenSearch
aws opensearch update-domain-config \
--domain-name sante-iot-dashboard \
--cognito-options '{
"Enabled": true,
"UserPoolId": "ca-central-1_XXXXXXXXX",
"IdentityPoolId": "ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ",
"RoleArn": "arn:aws:iam::123456789012:role/CognitoAccessForAmazonOpenSearch"
}'
# Attendre que la mise à jour soit terminée
aws opensearch describe-domain \
--domain-name sante-iot-dashboard \
--query "DomainStatus.Processing"
# Attendre que la valeur soit "false"
# Récupérer l'URL des Dashboards
aws opensearch describe-domain \
--domain-name sante-iot-dashboard \
--query "DomainStatus.Endpoints"va-plus-loin
Este artigo cobre os trechos mais úteis — o curso completo AWS Dados em Tempo Real (6 capítulos, 14 lições, exercícios corrigidos e projeto final) leva você até o fim.
./acceder-au-cours-complet curso gratuito : Dominando o Claude CodeFAQ
Quanto tempo para aprender AWS Dados em Tempo Real?
Com uma progressão estruturada (6 capítulos, 14 lições curtas e práticas), você atinge um nível operacional em algumas semanas dedicando 30 a 60 minutos por dia. O importante é praticar cada conceito imediatamente.
Precisa de pré-requisitos?
Básicos de informática são suficientes. Se você sabe usar um terminal e ler código simples, está pronto.
Por onde começar na prática?
Reproduza os comandos deste artigo e depois siga o curso completo AWS Dados em Tempo Real: ele encadeia as 14 lições em ordem, com exercícios e projeto final.
./a-lire-aussi
→ AWS Data Engineering Bootcamp explicado de forma simples (com diagramas e código real)→ Python Data Science : as 9 etapas-chave para ir de zero a operacional→ Python NumPy na prática : o código e os comandos que realmente importam📬 Quer receber este tipo de guia toda semana? Inscreva-se gratuitamente — código real, zero enrolação.