Lance-se em AWS Dados em Tempo Real: seu primeiro passo concreto hoje

AWS Dados em Tempo Real: o essencial em um artigo — código real, diagramas e etapas concretas, extratos de um curso de 14 lições.

Lance-se em AWS Dados em Tempo Real: seu primeiro passo concreto hoje

A melhor forma de aprender AWS Dados em Tempo Real é fazendo. Este artigo te dá o pontapé inicial com trechos práticos extraídos de um curso de 14 lições — o suficiente para obter um primeiro resultado já hoje.

tl;dr
  • Introdução ao Streaming de Dados AWS
  • Amazon Kinesis
  • Amazon OpenSearch
  • Segurança e Criptografia
  • Kafka vs Kinesis e Alternativas
~$ cat ./parcours.md # AWS Dados em Tempo Real — 6 capítulos
01
Introdução ao Streaming de Dados AWS
→ Capítulo 00 – Lição 1 : Dados em Tempo Real vs Processamento em Lotes (Batch)→ Capítulo 00 – Lição 2 : O Ecossistema AWS para o Streaming de Dados
02
Amazon Kinesis
→ Capítulo 01 – Lição 1 : Amazon Kinesis Data Streams — Shards e Consumidores→ Capítulo 01 – Lição 2 : Kinesis Firehose — Entrega Gerenciada e Transformação+ 1 mais lições
03
Amazon OpenSearch
→ Capítulo 02 – Lição 1 : Amazon OpenSearch — Indexação e Pesquisa em Tempo Real→ Capítulo 02 – Lição 2 : Amazon Cognito — Autenticação para OpenSearch Dashboards+ 1 mais lições
04
Segurança e Criptografia
→ Capítulo 03 – Lição 1 : AWS KMS — Criptografia dos Dados no Kinesis e Firehose→ Capítulo 03 – Lição 2 : IAM, VPC e Boas Práticas de Segurança para o Streaming
05
Kafka vs Kinesis e Alternativas
→ Capítulo 04 – Lição 1 : Apache Kafka vs Amazon Kinesis — Comparação Completa→ Capítulo 04 – Lição 2 : Migração Kafka para Kinesis / MSK e Padrões Híbridos
06
Arquiteturas de Referência
→ Capítulo 05 – Lição 1 : Arquitetura de Referência — IoT Wearables Médicos→ Capítulo 05 – Lição 2 : Arquitetura de Referência — Detecção de Fraude E-Commerce em Tempo Real
🏁
Projeto final
→ Você sai com um projeto concreto e demonstrável

Capítulo 01 – Lição 1 : Amazon Kinesis Data Streams — Shards e Consumidores

NOTEObjetivo — Compreender a arquitetura interna do Kinesis Data Streams: como os shards permitem o paralelismo, como os produtores publicam dados e como os consumidores os processam — com e sem Enhanced Fan-Out.

1. Arquitetura dos Shards — A Unidade Básica

NOTEShard — Um shard é uma unidade de capacidade isolada em um fluxo Kinesis. Cada shard pode ingerir até 1 MB/s ou 1 000 registros/s na entrada e fornecer até 2 MB/s na saída (por consumidor).
output
# Capacité par shard
# ─────────────────────────────────────
# Écriture (PUT) :  1 MB/s  OU  1 000 records/s  (la limite la plus basse s'applique)
# Lecture  (GET) :  2 MB/s  par GetRecords() - partagée entre TOUS les consommateurs
#
# → Pour 5 Mo/s d'ingestion : vous avez besoin de 5 shards
# → Pour 10 000 records/s    : vous avez besoin de 10 shards

# Calculer le nombre de shards nécessaires
max_ingestion_rate = 5    # MB/s
max_record_rate    = 3000 # records/s

shards_pour_volume = max_ingestion_rate / 1    # = 5 shards
shards_pour_taux   = max_record_rate / 1000    # = 3 shards

nb_shards = max(shards_pour_volume, shards_pour_taux)
print(f"Shards nécessaires : {int(nb_shards)}")  # → 5 shards

Partition Key — Como os Dados são Distribuídos

NOTEPartition Key — Cada registro possui uma chave de partição. O Kinesis aplica uma função de hash MD5 nessa chave para determinar em qual shard o registro é roteado. Registros com a mesma chave de partição sempre vão para o mesmo shard (ordem garantida).
output
# Exemples de Partition Keys et leur impact

# ✅ BON : clé à haute cardinalité → distribution uniforme entre shards
# device_id = "WATCH-001", "WATCH-002", ..., "WATCH-10000"
kinesis.put_record(
    StreamName='donnees-sante',
    Data=json.dumps(payload),
    PartitionKey=payload['device_id']   # Des milliers de valeurs uniques
)

# ⚠️ PROBLÈME : clé à faible cardinalité → "hot shard" (un shard surchargé)
# Si tous les enregistrements utilisent la même clé :
kinesis.put_record(
    StreamName='donnees-sante',
    Data=json.dumps(payload),
    PartitionKey='donnees-sante'   # ← Toujours le même shard ! Goulot d'étranglement
)

# ✅ SOLUTION si pas de clé naturelle : clé aléatoire
import uuid
kinesis.put_record(
    StreamName='donnees-sante',
    Data=json.dumps(payload),
    PartitionKey=str(uuid.uuid4())   # Aléatoire = distribution parfaite
    # Attention : perte de l'ordre dans le shard !
)

2. Produtores — Enviar Dados

put_record vs put_records

output
import boto3, json, time

kinesis = boto3.client('kinesis', region_name='ca-central-1')

# Méthode 1 : put_record — un seul enregistrement
# Latence : ~70ms par appel
donnee = {
    "device_id": "WATCH-001",
    "heart_rate": 72,
    "timestamp": int(time.time())
}
response = kinesis.put_record(
    StreamName='donnees-sante',
    Data=json.dumps(donnee).encode('utf-8'),
    PartitionKey=donnee['device_id']
)
print(f"ShardId: {response['ShardId']}")

# ─────────────────────────────────────

# Méthode 2 : put_records — plusieurs enregistrements en batch (RECOMMANDÉ)
# Latence : ~70ms pour jusqu'à 500 enregistrements !
# Limite : max 500 enregistrements ou 5MB par appel
donnees = [
    {"device_id": f"WATCH-{i:03d}", "heart_rate": 60 + i, "timestamp": int(time.time())}
    for i in range(10)
]

records = [
    {
        'Data': json.dumps(d).encode('utf-8'),
        'PartitionKey': d['device_id']
    }
    for d in donnees
]

response = kinesis.put_records(
    StreamName='donnees-sante',
    Records=records
)

print(f"Enregistrements OK: {response['Records'].__len__()} envoyés")
print(f"Échecs: {response['FailedRecordCount']}")

# Gérer les échecs partiels (put_records peut avoir des succès partiels)
failed_records = [
    records[i] for i, r in enumerate(response['Records'])
    if 'ErrorCode' in r
]
if failed_records:
    print(f"Retry nécessaire pour {len(failed_records)} enregistrements")

3. Consumidores — Ler os Dados

Modo Padrão (GetRecords)

NOTEGetRecords Standard — Todos os consumidores compartilham o limite de 2 MB/s por shard. Se você tiver 3 consumidores em 1 shard, cada um obtém apenas ~667 KB/s.
output
# Consommateur standard avec GetRecords
import boto3, json, time

kinesis = boto3.client('kinesis', region_name='ca-central-1')
STREAM_NAME = 'donnees-sante'

# 1. Lister les shards
response = kinesis.describe_stream(StreamName=STREAM_NAME)
shards = response['StreamDescription']['Shards']

for shard in shards:
    shard_id = shard['ShardId']

    # 2. Obtenir un itérateur de shard (LATEST = nouvelles données seulement)
    iterator_response = kinesis.get_shard_iterator(
        StreamName=STREAM_NAME,
        ShardId=shard_id,
        ShardIteratorType='LATEST'  # Ou 'TRIM_HORIZON' pour lire depuis le début
    )
    shard_iterator = iterator_response['ShardIterator']

    # 3. Boucle de lecture
    while True:
        records_response = kinesis.get_records(
            ShardIterator=shard_iterator,
            Limit=100  # Max 100 enregistrements par appel (ou 10MB)
        )

        for record in records_response['Records']:
            data = json.loads(record['Data'].decode('utf-8'))
            print(f"Device: {data['device_id']}, HR: {data['heart_rate']}")

        shard_iterator = records_response['NextShardIterator']

        # Limitation : max 5 appels GetRecords/s par shard
        if not records_response['Records']:
            time.sleep(1)  # Attendre si pas de nouvelles données

Enhanced Fan-Out — Consumidores Dedicados

TIPEnhanced Fan-Out — Cada consumidor registrado obtém sua própria capacidade de 2 MB/s por shard (em vez de compartilhar). Usa HTTP/2 push em vez de polling. Ideal para aplicações críticas que exigem baixa latência (<70ms).
bash
# Enregistrer un consommateur Enhanced Fan-Out
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante \
    --consumer-name alertes-medicales-temps-reel

# Lister les consommateurs enregistrés
aws kinesis list-stream-consumers \
    --stream-arn arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante
output
# Consommateur avec Enhanced Fan-Out (SubscribeToShard)
import boto3, json

kinesis = boto3.client('kinesis', region_name='ca-central-1')

STREAM_ARN     = 'arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante'
CONSUMER_ARN   = 'arn:aws:kinesis:ca-central-1:123456789012:stream/donnees-sante/consumer/alertes-medicales:1234567890'
SHARD_ID       = 'shardId-000000000000'

# SubscribeToShard avec HTTP/2 streaming (push au lieu de polling)
response = kinesis.subscribe_to_shard(
    ConsumerARN=CONSUMER_ARN,
    ShardId=SHARD_ID,
    StartingPosition={'Type': 'LATEST'}
)

# Traiter le flux d'événements en temps réel
event_stream = response['EventStream']
for event in event_stream:
    if 'SubscribeToShardEvent' in event:
        for record in event['SubscribeToShardEvent']['Records']:
            data = json.loads(record['Data'].decode('utf-8'))
            print(f"[Enhanced Fan-Out] Device: {data['device_id']}, HR: {data['heart_rate']}")

Capítulo 02 – Lição 1 : Amazon OpenSearch — Indexação e Pesquisa em Tempo Real

NOTEObjetivo — Compreender o funcionamento do Amazon OpenSearch Service: como criar índices, definir mapeamentos e realizar pesquisas e agregações em tempo real em dados de streaming.

1. OpenSearch — Conceitos Fundamentais

NOTEOpenSearch — Fork open-source do Elasticsearch (desenvolvido pela AWS desde 2021). Motor de pesquisa e analytics distribuído baseado no Apache Lucene. O Amazon OpenSearch Service é a versão gerenciada na AWS.

Terminologia

Index≈ Tabela de um banco de dados
Document≈ Linha em uma tabela (JSON)
Field≈ Coluna em uma tabela
ShardFragmento de um índice (distribuição)
ReplicaCópia de um shard (alta disponibilidade)
MappingEsquema dos tipos de campos

Quando Usar OpenSearch

2. Criar um Domínio OpenSearch

bash
# Créer un domaine OpenSearch pour données IoT
aws opensearch create-domain \
    --domain-name sante-iot-dashboard \
    --engine-version 'OpenSearch_2.11' \
    --cluster-config '{
        "InstanceType": "t3.small.search",
        "InstanceCount": 2,
        "DedicatedMasterEnabled": false,
        "ZoneAwarenessEnabled": true,
        "ZoneAwarenessConfig": {"AvailabilityZoneCount": 2}
    }' \
    --ebs-options '{
        "EBSEnabled": true,
        "VolumeType": "gp3",
        "VolumeSize": 20,
        "Iops": 3000
    }' \
    --encryption-at-rest-options '{"Enabled": true}' \
    --node-to-node-encryption-options '{"Enabled": true}' \
    --domain-endpoint-options '{"EnforceHTTPS": true, "TLSSecurityPolicy": "Policy-Min-TLS-1-2-2019-07"}' \
    --advanced-security-options '{
        "Enabled": true,
        "InternalUserDatabaseEnabled": true,
        "MasterUserOptions": {
            "MasterUserName": "admin",
            "MasterUserPassword": "VotreMotDePasseSecurise#123"
        }
    }'

# Vérifier le statut du domaine
aws opensearch describe-domain \
    --domain-name sante-iot-dashboard \
    --query "DomainStatus.Processing"

3. Criar um Índice com Mapeamento

NOTEMapping — O mapeamento define os tipos de dados de cada campo. O OpenSearch pode detectar automaticamente os tipos (dynamic mapping), mas é recomendado definir explicitamente o mapeamento para os campos críticos a fim de evitar erros de tipagem.
output
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

# Connexion au domaine OpenSearch
region   = 'ca-central-1'
service  = 'es'
host     = 'https://votre-domaine.ca-central-1.es.amazonaws.com'

credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
    credentials.access_key,
    credentials.secret_key,
    region, service,
    session_token=credentials.token
)

client = OpenSearch(
    hosts=[{'host': host.replace('https://', ''), 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

# Créer l'index avec un mapping explicite
mapping = {
    "settings": {
        "number_of_shards":   2,    # Distribuer sur 2 nœuds
        "number_of_replicas": 1     # 1 copie pour la HA
    },
    "mappings": {
        "properties": {
            "device_id":   {"type": "keyword"},     # Valeur exacte (non analysée)
            "patient_id":  {"type": "keyword"},
            "timestamp":   {"type": "date", "format": "epoch_second"},
            "heart_rate":  {"type": "integer"},
            "spo2":        {"type": "float"},
            "temperature": {"type": "float"},
            "steps":       {"type": "integer"},
            "severite":    {"type": "keyword"},
            "region":      {"type": "keyword"},
            "processed_at":{"type": "date"},
            # Champ texte analysé pour la recherche full-text
            "notes":       {"type": "text", "analyzer": "french"}
        }
    }
}

response = client.indices.create(index='donnees-cardiaques', body=mapping)
print(f"Index créé : {response['acknowledged']}")

# Vérifier le mapping
mapping_info = client.indices.get_mapping(index='donnees-cardiaques')
print(f"Mapping : {list(mapping_info['donnees-cardiaques']['mappings']['properties'].keys())}")

4. Indexar Documentos

output
import time, json, random

# Indexer un document individuel
document = {
    "device_id":   "WATCH-042",
    "patient_id":  "PATIENT-42",
    "timestamp":   int(time.time()),
    "heart_rate":  88,
    "spo2":        97.5,
    "temperature": 37.1,
    "steps":       12453,
    "severite":    "NORMAL",
    "region":      "ca-central-1"
}

response = client.index(
    index='donnees-cardiaques',
    body=document,
    id=f"{document['device_id']}-{document['timestamp']}",  # ID unique
    refresh=True  # Rend le document immédiatement recherchable
)
print(f"Document indexé : {response['result']}")

# Indexation en bulk (plus efficace pour de grands volumes)
from opensearchpy.helpers import bulk

def generer_donnees(n_montres=100):
    """Générer n enregistrements de montres cardiaques"""
    actions = []
    for i in range(n_montres):
        heart_rate = random.randint(55, 115)
        spo2       = random.uniform(93, 100)

        doc = {
            "_index":    "donnees-cardiaques",
            "_id":       f"WATCH-{i:03d}-{int(time.time())}",
            "_source": {
                "device_id":   f"WATCH-{i:03d}",
                "patient_id":  f"PATIENT-{i}",
                "timestamp":   int(time.time()),
                "heart_rate":  heart_rate,
                "spo2":        round(spo2, 1),
                "temperature": round(random.uniform(36.0, 37.8), 1),
                "steps":       random.randint(0, 20000),
                "severite":    "CRITIQUE" if heart_rate > 140 else "ATTENTION" if heart_rate > 110 else "NORMAL",
                "region":      "ca-central-1"
            }
        }
        actions.append(doc)
    return actions

# Indexation bulk
success, errors = bulk(client, generer_donnees(100))
print(f"Indexé : {success} documents, {len(errors)} erreurs")

5. Pesquisas e Agregações

Pesquisa Simples (Match Query)

output
# Trouver tous les enregistrements CRITIQUES
query_critiques = {
    "query": {
        "term": {"severite": "CRITIQUE"}
    },
    "sort": [{"timestamp": {"order": "desc"}}],
    "size": 20
}

response = client.search(index='donnees-cardiaques', body=query_critiques)
print(f"Enregistrements critiques : {response['hits']['total']['value']}")
for hit in response['hits']['hits']:
    print(f"  Device: {hit['_source']['device_id']}, HR: {hit['_source']['heart_rate']}")

Pesquisa com Filtro de Intervalo (Range Query)

output
# Trouver les anomalies cardiaques des 30 dernières minutes
now = int(time.time())
thirty_min_ago = now - 1800

query_anomalies = {
    "query": {
        "bool": {
            "must": [
                {
                    "range": {
                        "timestamp": {
                            "gte": thirty_min_ago,
                            "lte": now
                        }
                    }
                }
            ],
            "should": [
                {"range": {"heart_rate": {"gt": 120}}},    # Tachycardie
                {"range": {"heart_rate": {"lt": 45}}},     # Bradycardie
                {"range": {"spo2": {"lt": 90}}}            # Hypoxémie
            ],
            "minimum_should_match": 1
        }
    },
    "sort": [{"timestamp": {"order": "desc"}}]
}

response = client.search(index='donnees-cardiaques', body=query_anomalies)
print(f"Anomalies détectées : {response['hits']['total']['value']}")

Agregações — Estatísticas em Tempo Real

output
# Agrégation : statistiques cardiaques par device (dernières 24h)
query_stats = {
    "query": {
        "range": {
            "timestamp": {"gte": "now-24h/h", "lte": "now"}
        }
    },
    "aggs": {
        "par_device": {
            "terms": {"field": "device_id", "size": 50},
            "aggs": {
                "fc_moyenne":  {"avg": {"field": "heart_rate"}},
                "fc_max":      {"max": {"field": "heart_rate"}},
                "fc_min":      {"min": {"field": "heart_rate"}},
                "spo2_moyenne":{"avg": {"field": "spo2"}},
                "nb_critiques":{
                    "filter": {"term": {"severite": "CRITIQUE"}}
                }
            }
        },
        "par_severite": {
            "terms": {"field": "severite"},
            "aggs": {
                "nb_patients": {"cardinality": {"field": "patient_id"}}
            }
        },
        "tendance_horaire": {
            "date_histogram": {
                "field":            "timestamp",
                "fixed_interval":   "1h",
                "format":           "yyyy-MM-dd HH:mm"
            },
            "aggs": {
                "fc_moyenne": {"avg": {"field": "heart_rate"}}
            }
        }
    },
    "size": 0   # Ne retourner que les agrégations, pas les documents
}

response = client.search(index='donnees-cardiaques', body=query_stats)
buckets = response['aggregations']['par_severite']['buckets']
for b in buckets:
    print(f"Sévérité {b['key']}: {b['doc_count']} lectures, {b['nb_patients']['value']} patients")

Capítulo 02 – Lição 2 : Amazon Cognito — Autenticação para OpenSearch Dashboards

NOTEObjetivo — Configurar o Amazon Cognito para proteger o acesso aos OpenSearch Dashboards, criar papéis distintos para diferentes tipos de usuários (administradores, cientistas de dados, operadores de monitoramento) e aplicar controle de acesso granular aos índices.

1. Por que Cognito para OpenSearch?

NOTEProblema — Por padrão, o OpenSearch Dashboards usa autenticação HTTP básica (nome de usuário/senha). Isso não é adequado para uma organização com dezenas de usuários, perfis diferentes e necessidades de autenticação SSO.

Sem Cognito

Com Cognito

2. Arquitetura Cognito + OpenSearch

output
# Architecture d'authentification

# UTILISATEUR
#     ↓  (1) Accède à OpenSearch Dashboards
# AMAZON COGNITO USER POOL
#     ↓  (2) Vérifie les credentials (ou SSO via IdP)
#     ↓  (3) Émet un token JWT
# AMAZON COGNITO IDENTITY POOL
#     ↓  (4) Échange le JWT contre des credentials AWS temporaires (STS)
#     ↓  (5) Assigne un rôle IAM selon le groupe Cognito
# AMAZON OPENSEARCH SERVICE
#     ↓  (6) Vérifie les permissions Fine-Grained Access Control (FGAC)
# DONNÉES DE L'INDEX

3. Criar o User Pool Cognito

bash
# Étape 1 : Créer le User Pool
aws cognito-idp create-user-pool \
    --pool-name opensearch-sante-users \
    --policies '{
        "PasswordPolicy": {
            "MinimumLength": 12,
            "RequireUppercase": true,
            "RequireLowercase": true,
            "RequireNumbers": true,
            "RequireSymbols": true
        }
    }' \
    --auto-verified-attributes email \
    --mfa-configuration OPTIONAL \
    --sms-configuration '{"SnsCallerArn": "arn:aws:iam::123456789012:role/CognitoSNSRole", "ExternalId": "extId"}' \
    --query "UserPool.Id" --output text
# → us-east-1_XXXXXXXXX (notez cet ID)

# Étape 2 : Créer le App Client (sans secret pour Dashboards)
aws cognito-idp create-user-pool-client \
    --user-pool-id us-east-1_XXXXXXXXX \
    --client-name opensearch-dashboards-client \
    --no-generate-secret \
    --explicit-auth-flows ALLOW_USER_PASSWORD_AUTH ALLOW_REFRESH_TOKEN_AUTH \
    --supported-identity-providers COGNITO \
    --query "UserPoolClient.ClientId" --output text
# → YYYYYYYYYYYYYYYYYY (notez cet ID)

Criar os Grupos de Usuários

bash
# Groupe 1 : Administrateurs OpenSearch (accès total)
aws cognito-idp create-group \
    --user-pool-id us-east-1_XXXXXXXXX \
    --group-name opensearch-admins \
    --description "Accès administrateur complet à OpenSearch" \
    --role-arn arn:aws:iam::123456789012:role/OpenSearch-AdminRole

# Groupe 2 : Data Scientists (lecture seule sur tous les index)
aws cognito-idp create-group \
    --user-pool-id us-east-1_XXXXXXXXX \
    --group-name opensearch-scientists \
    --description "Lecture seule pour analyse" \
    --role-arn arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole

# Groupe 3 : Opérateurs de monitoring (lecture sur l'index monitoring seulement)
aws cognito-idp create-group \
    --user-pool-id us-east-1_XXXXXXXXX \
    --group-name opensearch-operators \
    --description "Monitoring opérationnel uniquement" \
    --role-arn arn:aws:iam::123456789012:role/OpenSearch-OperatorRole

# Créer un utilisateur et l'ajouter à un groupe
aws cognito-idp admin-create-user \
    --user-pool-id us-east-1_XXXXXXXXX \
    --username dr.dupont@hopital.ca \
    --user-attributes Name=email,Value=dr.dupont@hopital.ca Name=email_verified,Value=true \
    --temporary-password TempPassword123!

aws cognito-idp admin-add-user-to-group \
    --user-pool-id us-east-1_XXXXXXXXX \
    --username dr.dupont@hopital.ca \
    --group-name opensearch-scientists

4. Criar o Identity Pool

bash
# Créer l'Identity Pool (lie le User Pool aux rôles IAM)
aws cognito-identity create-identity-pool \
    --identity-pool-name opensearch-sante-identity \
    --allow-unauthenticated-identities false \
    --cognito-identity-providers '[
        {
            "ProviderName": "cognito-idp.ca-central-1.amazonaws.com/ca-central-1_XXXXXXXXX",
            "ClientId": "YYYYYYYYYYYYYYYYYY",
            "ServerSideTokenCheck": true
        }
    ]' \
    --query "IdentityPoolId" --output text
# → ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ

# Associer les rôles IAM à l'Identity Pool
aws cognito-identity set-identity-pool-roles \
    --identity-pool-id "ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ" \
    --roles authenticated=arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole \
    --role-mappings '{
        "cognito-idp.ca-central-1.amazonaws.com/ca-central-1_XXXXXXXXX:YYYYYYYYYYYYYYYYYY": {
            "Type": "Rules",
            "AmbiguousRoleResolution": "AuthenticatedRole",
            "RulesConfiguration": {
                "Rules": [
                    {
                        "Claim": "cognito:groups",
                        "MatchType": "Contains",
                        "Value": "opensearch-admins",
                        "RoleARN": "arn:aws:iam::123456789012:role/OpenSearch-AdminRole"
                    },
                    {
                        "Claim": "cognito:groups",
                        "MatchType": "Contains",
                        "Value": "opensearch-operators",
                        "RoleARN": "arn:aws:iam::123456789012:role/OpenSearch-OperatorRole"
                    }
                ]
            }
        }
    }'

5. Controle de Acesso Granular no OpenSearch

NOTEFine-Grained Access Control (FGAC) — O OpenSearch permite controlar o acesso no nível do índice, do tipo de documento e até de campos individuais. Isso permite criar papéis personalizados que expõem apenas os dados necessários.
output
# Configurer les rôles OpenSearch via l'API REST

import requests, json

host  = 'https://votre-domaine.ca-central-1.es.amazonaws.com'
auth  = ('admin', 'VotreMotDePasseSecurise#123')

# Rôle 1 : Lecture seule sur l'index données-cardiaques
roles_payload = {
    "cluster_permissions": ["cluster:monitor/main"],
    "index_permissions": [
        {
            "index_patterns": ["donnees-cardiaques*"],
            "allowed_actions": [
                "read",
                "indices:data/read/search",
                "indices:data/read/msearch",
                "indices:admin/mappings/get"
            ]
        }
    ],
    "tenant_permissions": [
        {
            "tenant_patterns": ["global_tenant"],
            "allowed_actions": ["kibana_all_read"]
        }
    ]
}

# Créer le rôle "data-scientist-reader"
requests.put(
    f"{host}/_plugins/_security/api/roles/data-scientist-reader",
    json=roles_payload, auth=auth
)

# Mapper le rôle IAM Cognito vers le rôle OpenSearch
role_mapping = {
    "backend_roles": [
        "arn:aws:iam::123456789012:role/OpenSearch-DataScientistRole"
    ],
    "description": "Data Scientists ont accès en lecture aux données cardiaques"
}

requests.put(
    f"{host}/_plugins/_security/api/rolesmapping/data-scientist-reader",
    json=role_mapping, auth=auth
)

print("Rôle et mapping créés avec succès")

6. Ativar Cognito no OpenSearch

bash
# Activer l'authentification Cognito sur le domaine OpenSearch
aws opensearch update-domain-config \
    --domain-name sante-iot-dashboard \
    --cognito-options '{
        "Enabled": true,
        "UserPoolId": "ca-central-1_XXXXXXXXX",
        "IdentityPoolId": "ca-central-1:ZZZZZZZZ-ZZZZ-ZZZZ-ZZZZ-ZZZZZZZZZZZZ",
        "RoleArn": "arn:aws:iam::123456789012:role/CognitoAccessForAmazonOpenSearch"
    }'

# Attendre que la mise à jour soit terminée
aws opensearch describe-domain \
    --domain-name sante-iot-dashboard \
    --query "DomainStatus.Processing"
# Attendre que la valeur soit "false"

# Récupérer l'URL des Dashboards
aws opensearch describe-domain \
    --domain-name sante-iot-dashboard \
    --query "DomainStatus.Endpoints"
va-plus-loin

Este artigo cobre os trechos mais úteis — o curso completo AWS Dados em Tempo Real (6 capítulos, 14 lições, exercícios corrigidos e projeto final) leva você até o fim.

./acceder-au-cours-complet curso gratuito : Dominando o Claude Code

FAQ

Quanto tempo para aprender AWS Dados em Tempo Real?
Com uma progressão estruturada (6 capítulos, 14 lições curtas e práticas), você atinge um nível operacional em algumas semanas dedicando 30 a 60 minutos por dia. O importante é praticar cada conceito imediatamente.
Precisa de pré-requisitos?
Básicos de informática são suficientes. Se você sabe usar um terminal e ler código simples, está pronto.
Por onde começar na prática?
Reproduza os comandos deste artigo e depois siga o curso completo AWS Dados em Tempo Real: ele encadeia as 14 lições em ordem, com exercícios e projeto final.

📬 Quer receber este tipo de guia toda semana? Inscreva-se gratuitamente — código real, zero enrolação.