Dominando Apache Spark (VII): Funciones para cargar y exportar datos en PySpark

Nov 11, 2023 | Big Data, Python | 0 Comentarios

En el procesamiento de datos a gran escala, PySpark se ha convertido en una de las herramientas preferidas. Su capacidad para manejar grandes volúmenes de datos y ejecutar tareas de procesamiento distribuido lo convierte en una elección ideal para científicos de datos y profesionales del análisis.

Parte fundamental del flujo de trabajo en PySpark es la habilidad para cargar y exportar datos de diversas fuentes y formatos. En este artículo, exploraremos funciones avanzadas que PySpark ofrece para facilitar la importación y exportación de datos, lo que te permitirá aprovechar al máximo este poderoso framework.

 

Funciones básicas de carga y exportación

PySpark proporciona funciones básicas para cargar y exportar datos desde y hacia diversas fuentes. Algunas de las operaciones más comunes incluyen

Carga de Datos

Lectura de Datos desde Archivos: PySpark permite leer datos desde archivos en diferentes formatos, como CSV, JSON, Parquet, Avro, y más. La función read de SparkSession se utiliza para cargar datos. Aquí un ejemplo:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate() 
df = spark.read.csv("datos.csv", header=True, inferSchema=True)

A continuación, se detallan algunos aspectos importantes relacionados con la función read en PySpark:

  • Modos de carga: PySpark admite varios modos de carga, como .load(), .json(), .csv(), etc., según el formato del fichero. Estos métodos son atajos que simplifican la lectura de datos de formatos específicos. Por ejemplo, .load() es el método genérico que se puede usar para varios formatos.
  • Esquema y tipado: PySpark intentará inferir el esquema de los datos de forma automática al cargarlos. Si es necesario, también puedes especificar un esquema personalizado para los datos cargados.
  • Opciones personalizadas: Puedes utilizar el método .option("clave", "valor") para especificar opciones personalizadas al cargar un fichero. Por ejemplo, si estás leyendo un archivo CSV, puedes configurar opciones para delimitadores, comprobación de encabezados, esquemas personalizados, etc. Estas opciones pueden variar según el formato del fichero que estás cargando.

Conexión a Bases de Datos: Puedes conectar PySpark a bases de datos relacionales, como MySQL, PostgreSQL, o SQL Server. La función jdbc te permite cargar datos directamente desde una tabla. Por ejemplo:

df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/db").option("dbtable", "tabla").option("user", "usuario").option("password", "contraseña").load()

A continuación, se explican algunos aspectos importantes sobre esta técnica:

  • URL de conexión: El parámetro "url" en el diccionario conexion_db debe contener la URL de conexión a tu base de datos. La estructura de la URL dependerá del tipo de base de datos que estés utilizando. Debes proporcionar detalles como el servidor, el puerto y el nombre de la base de datos a la que deseas conectarte.
  • Usuario y contraseña: Proporciona las credenciales de usuario y contraseña de la base de datos en los parámetros "user" y "password" del diccionario conexion_db.
  • Consulta SQL: Define la consulta SQL que deseas ejecutar en el parámetro "consulta". Esta consulta puede ser tan simple o compleja como necesites, y PySpark la utilizará para seleccionar datos de la base de datos.
  • Método de formato y opciones: Al usar .format("jdbc"), le indicas a PySpark que deseas conectarte a una base de datos. Luego, con .options(**conexion_db).option("dbtable", consulta), configuras las opciones de conexión y especificas la tabla o vista de la base de datos de la que deseas cargar los datos.

 

Exportación de datos

Guardado de Datos en Archivos: PySpark te permite guardar un DataFrame en varios formatos, como Parquet, Avro, CSV o JSON. Utiliza la función write para realizar esta operación. Ejemplo:

df.write.parquet("resultado.parquet")

Guardado en Bases de Datos: Similar a la carga, puedes guardar datos en bases de datos utilizando la función jdbc. Aquí hay un ejemplo:

df.write.format("jdbc").option("url", "jdbc:mysql://localhost/db").option("dbtable", "nuevatabla").option("user", "usuario").option("password", "contraseña").save()

 

Opciones de lectura avanzadas

  1. Lectura de Datos Paralela: Para acelerar la importación de datos, PySpark permite la lectura paralela de múltiples archivos en un directorio. Puedes utilizar la función read.option("basePath", "directorio").load("directorio/*") para realizar esta operación.
  2. Manejo de Datos Corruptos: En situaciones donde los archivos de origen pueden contener datos corruptos, PySpark te permite definir reglas de manejo de errores utilizando la opción .option("mode", "PERMISSIVE|DROPMALFORMED|FAILFAST").
  3. Muestreo de Datos: Si necesitas trabajar con un subconjunto de tus datos, PySpark ofrece la opción .sample() para tomar muestras aleatorias de tus datos durante la importación.

 

Exportación con particionamiento

Cuando trabajas con conjuntos de datos grandes, el particionamiento es fundamental para un rendimiento óptimo. PySpark proporciona opciones avanzadas para particionar datos durante la exportación:

  1. Particionamiento por Columnas: Puedes especificar una o varias columnas para particionar tus datos. Por ejemplo, si deseas particionar por la columna «año» y «mes», puedes utilizar la función .partitionBy("año", "mes") al exportar los datos.
  2. Particionamiento por Nivel: Puedes controlar la profundidad de particionamiento en directorios utilizando la función .bucketBy(n, "columna"). Esto puede ser útil para optimizar las consultas posteriores.

 

Ejemplos con S3 y base de datos MongoDB

A continuación, exploraremos algunos ejemplos avanzados de importación y exportación de datos en PySpark con Amazon S3 y MongoDB.

Ejemplo 1: Importación y exportación de desde Amazon S3

Amazon S3 es un popular servicio de almacenamiento en la nube, y PySpark facilita la importación y exportación de datos desde este servicio. Para cargar datos desde S3, puedes usar la función read con la URL de S3:

df = spark.read.csv("s3a://mi-bucket/mi-archivo.csv", header=True, inferSchema=True)

Para exportar datos a S3, utiliza la función write:

df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/db").option("dbtable", "tabla").option("user", "usuario").option("password", "contraseña").load()

Ejemplo 2: Importación y Exportación de Datos desde/hacia MongoDB

PySpark ofrece conectores para bases de datos NoSQL populares, como MongoDB. Puedes cargar datos desde MongoDB de la siguiente manera:

df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/db").option("dbtable", "tabla").option("user", "usuario").option("password", "contraseña").load()

Para exportar datos a MongoDB:

df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/db").option("dbtable", "tabla").option("user", "usuario").option("password", "contraseña").load()

Estos son solo dos ejemplos de las numerosas opciones avanzadas que PySpark brinda para la importación y exportación de datos.

Con este artículo, has aprendido sobre las funciones avanzadas de importación y exportación de datos en PySpark, lo que te permitirá manejar datos de manera más efectiva en tus proyectos de ciencia de datos y análisis.

 

A continuación tenéis todas las publicaciones de esta serie hasta ahora:

También te puede interesar:

5 trucos para Jupyter Notebook que no debes perderte

Descubre 5 trucos para Jupyter Notebook no tan conocidos que te ayudarán familiarizarte más con esta herramienta y ser más eficiente.

Extracción de datos de sitios web con Scrapy (I): recopilando información de productos de Zara

En este tutorial explicamos como extraer con Scrapy los datos de la web de Zara de forma automatizada para finalmente almacenarlos en MongoDB.

Otros 5 trucos de Jupyter Notebook que probablemente desconozcas

En este artículo se explican 5 nuevos trucos para Jupyter Notebook que probablemente desconozcas y que mejorarán vuestra productividad.

Extracción de datos de sitios web con Scrapy (II): rastreando todas las URLs del sitio web de Zara

En este post construimos un rastreador con Scrapy que extrae los datos de manera recursiva todas las URLs del sitio web de Zara.

Extracción de datos de Twitter con Python (sin consumir la API)

En esta publicación os enseñaremos como poder extraer datos de Twitter en Python mediante la librería Twint. De esta forma, podremos obtener facilmente los últimos tweets que contengan cierta palabra o que pertenezcan a un determinado usuario y aplicar varios filtros.

Aplicación de la Distribución t de Student en Python: Ejemplos prácticos

Distribución t en Python: Aprende cómo aplicar la Distribución t en Python con ejemplos para pruebas de hipótesis e intervalos de confianza.

Dominando Apache Spark (I): Introducción y ventajas en el procesamiento de grandes volúmenes de datos

En el artículo, exploramos la historia y ventajas de Apache Spark como un marco de procesamiento de datos de código abierto. Destacamos su evolución y las razones para su popularidad en el procesamiento de datos a gran escala, incluyendo su velocidad y capacidad de procesamiento en memoria.

Las tendencias de Big Data y Machine Learning que dominarán en 2024

Descubre las tendencias de Big Data y Aprendizaje Automático que marcarán el rumbo en 2024, y cómo prepararte para el futuro tecnológico en este artículo informativo.

Dominando Apache Spark (II): Funcionamiento interno y arquitectura

En este artículo profundizamos en la arquitectura y el funcionamiento interno de Spark, destacando componentes clave.

Dominando Apache Spark (III): Explorando RDD (Resilient Distributed Datasets) y su poder en el procesamiento de datos

En este artículo exploramos RDD, una estructura fundamental para el procesamiento de datos distribuidos. Descubre cómo RDDs permiten la manipulación y transformación de datos a gran escala.

Ads Blocker Image Powered by Code Help Pro

Por favor, permite que se muestren anuncios en nuestro sitio web

Querido lector,

Esperamos que estés disfrutando de nuestro contenido. Entendemos la importancia de la experiencia sin interrupciones, pero también queremos asegurarnos de que podamos seguir brindándote contenido de alta calidad de forma gratuita. Desactivar tu bloqueador de anuncios en nuestro sitio nos ayuda enormemente a lograrlo.

¡Gracias por tu comprensión y apoyo!