Simulation de Streaming sur SSPCloud

Author

Ludovic Deneuville

1 Objectif

  • un dossier source contient 280 fichiers tweets-xxx.json
  • un dossier destination servira d’entrée pour le TP
  • créer un programme qui copie toutes les 2 secondes un fichier depuis source vers destination
    • cela simule un flux continu de données
    • quand tous les fichiers sont copiés, le dossier destination est vidé et ça recommence

Connectez-vous au SSPCloud

2 Créer un dossier destination

3 Comment se connecter à votre S3

  • import boto3
    s3 = boto3.client("s3",
                      endpoint_url = 'yyy',
                      aws_access_key_id= 'ttt', 
                      aws_secret_access_key= 'zzz', 
                      aws_session_token = 'xxx')

4 Lancer le programme de copie

import time
import boto3

<<< COLLER VOTRE CONFIG DE CONNEXION A S3 ICI >>>

bucket_source = "ludo2ne"
source_folder = "diffusion/ensai/stream_tweet"

<<< REMPLACER LE BUCKET ET LE FOLDER DESTINATION >>>

bucket_destination = "sspcloud_idep"
destination_folder = "destination"

print("Number of files : ")

while True:
    response = s3.list_objects_v2(Bucket=bucket_source, Prefix=source_folder)
    json_files = [obj["Key"] for obj in response["Contents"] if obj["Key"].endswith(".jsonl.gz")]

    for file in json_files:
        response_dest = s3.list_objects_v2(Bucket=bucket_destination, Prefix=destination_folder)
        json_dest_files = [obj["Key"] for obj in response_dest["Contents"] if obj["Key"].endswith(".jsonl.gz")]
        print(f"{len(json_dest_files)}", end=", ")
        if len(json_dest_files) % 20 == 0:
            print()
        
        file_name = file.split("/")[-1]    
        destination_object = f"{destination_folder}/{file_name}"
        
        s3.copy_object(CopySource={'Bucket': bucket_source, 'Key': file},
                       Bucket=bucket_destination,
                       Key=destination_object)
        #print(f"File '{file_name}' copied from '{source_folder}' to '{destination_folder}'")
    
        time.sleep(2)

    # Delete each JSON file from the bucket
    response_dest = s3.list_objects_v2(Bucket=bucket_destination, Prefix=destination_folder)
    json_dest_files = [obj["Key"] for obj in response_dest["Contents"] if obj["Key"].endswith(".jsonl.gz")]
    for json_file in json_dest_files:
        s3.delete_object(Bucket=bucket_destination, Key=json_file)
  • en cas de soucis, il suffit d’interrompte le kernel
  • il est également possible de supprimer puis recréer le dossier destination