hdfs dfs -put Gastos.csv /spark/pruebas
Cargar archivo local con Pandas en HDFS
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.master('spark://hostname:7077').appName('ejemplo').getOrCreate()
# df = spark.read.load('./Personas.csv', format='csv', sep='|', inferSchema='true', header='true')
df_pandas = pd.read_csv('./Personas.csv', sep = '|')
print(df_pandas)
df = spark.createDataFrame(df_pandas)
# df = spark.read.format("csv").option("sep", "|").option("inferSchema", "true").option("header", "true").load("./Personas.csv")
df.show(5)
df.write.option("delimiter", "|").mode("overwrite").parquet("/spark/pruebas/Personas.parquet")
Cargar SELECT SqlServer en HDFS
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# Create spark configuration object
conf = SparkConf()
conf.setMaster("spark://hostname:7077").setAppName("NombreDelProceso")
conf.set("spark.executor.memory", "10g")
conf.set("spark.driver.memory", "10g")
conf.set("spark.cores.max", "12")
# Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
# set variable to be used to connect the database
database = "BaseDeDatos"
table = "dbo.Tabla"
user = "usuario"
password = "password"
query = '''
SELECT
Campo1, Campo2, Campo3
FROM BaseDeDatos.dbo.Tabla WITH (NOLOCK)
WHERE Fecha >= 202104
'''
# read table data into a spark dataframe
jdbcDF = spark.read.format("jdbc") \
.option("url", f"jdbc:sqlserver://hostname:1433;databaseName={database};") \
.option("query", query) \
.option("user", user) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
# show the data loaded into dataframe
jdbcDF.show(3)
jdbcDF.write.option("delimiter", "|").mode("overwrite").parquet("/Carpeta/OtraCarpeta/Archivo.parquet")
Cargar tabla entera de SqlServer en HDFS
# import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# Create spark configuration object
conf = SparkConf()
conf.setMaster("spark://hostname:7077").setAppName("NombreDelProceso")
conf.set("spark.executor.memory", "10g")
conf.set("spark.driver.memory", "10g")
conf.set("spark.cores.max", "12")
# Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
# set variable to be used to connect the database
database = "BaseDeDatos"
table = "dbo.Tabla"
user = "usuario"
password = "password"
# read table data into a spark dataframe
jdbcDF = spark.read.format("jdbc") \
.option("url", f"jdbc:sqlserver://hostname:1433;databaseName={database};") \
.option("dbtable", table) \
.option("user", user) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
# show the data loaded into dataframe
jdbcDF.show(3)
jdbcDF.write.option("delimiter", "|").mode("overwrite").parquet("/carpeta/OtraCarperta/Archivo.parquet")
# Otra Opcion
jdbcDF.write.option("delimiter", "|").mode("overwrite").csv("/carpeta/OtraCarperta/Personas.csv")
ABM/CRUD simple con python, Sqlite y SqlAlchemy
main.py
from productos import Producto
nuevoProducto = Producto()
nuevoProducto.set_producto('Producto 1', 10.5, 'Descripcion producto 1')
res = nuevoProducto.get_productos_all()
for p in res:
print(p)
print(nuevoProducto.get_productos('Producto 1'))
nuevoProducto.del_producto(5)
print(nuevoProducto.get_productos('Producto 1'))
print(nuevoProducto.get_productos('Producto 3'))
nuevoProducto.mod_producto(3, 'ExProducto 3', 210, 'Descripcion del ex producto 3')
print(nuevoProducto.get_productos('ExProducto 3'))
Ordenar el codigo autopep8
Order el codigo autopep8
Ejecutar comando por script para ordenar el codigo.
Order el codigo autopep8
Leer configuración sin cargalas en entorno virtual
main.py
from os import path, getenv
from os.path import join
from dotenv import dotenv_values
dotenv_path = join('src', '.env.local')
variables = dotenv_values(dotenv_path)
print(variables)
print(variables['PG_PASSWORD'])
Cargar variables de entorno dentro del entorno virtual
main.py
from os import path, getenv
from os.path import join
from dotenv import load_dotenv
from personas import Personas
dotenv_path = join('src', '.env.local')
load_dotenv(dotenv_path=dotenv_path)
print('Valor de la varaibles =>', getenv('SQL_CONN_EXAMPLE'))
p = Personas()
print('Resultado de la clase Personas =>', p.get_connstr())
Sql Server y Python con pyodbc
import pyodbc
import json
class Data(object):
def __init__(self):
pass
def conectarSql(self):
self.server = 'server01'
self.db = 'baseDeDatos01'
self.usuario = None
self.contraseña = None
try:
#self.conexion = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+ self.server +';DATABASE=' + self.db +';UID='+ self.usuario + ';PWD='+ self.contraseña + "Trusted_Connection=yes;")
self.conexion = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+ self.server +';DATABASE=' + self.db +';Trusted_Connection=yes;')
return self.conexion
except:
self.mensaje = {}
return json.loads(self.mensaje)
def ejecutarSP(self, spName : str):
self.mensaje = dict()
self.con = self.conectarSql()
if isinstance(self.con, dict):
return self.con
else:
try:
self.res = self.con.execute('EXECUTE {}'.format(spName)).commit()
self.con.close()
self.mensaje = {"code": 200, "message": "Ejecucion existosa"}
except:
self.mensaje = {"code": 200, "message": "Error en la ejecucion"}
return self.mensaje
def logUltimaEjecucion(self):
self.query = """SELECT IdDetalleEjecucionProceso, IdEjecucionProceso, Proceso, Subproceso, Periodo, Mensaje, Inicio, Fin, Estado, Resultado
FROM [log].[DetalleEjecucionProceso] WITH (NOLOCK)
WHERE IDEjecucionProceso in (SELECT MAX(IDEjecucionProceso) FROM [log].[EjecucionProceso] WITH (NOLOCK)) ORDER BY Inicio DESC"""
self.con = self.conectarSql()
if isinstance(self.con, dict):
return self.con
else:
try:
self.datos = list()
self.res = self.con.execute(self.query)
for d in self.res:
self.datos.append({"IdDetalleEjecucionProceso": d[0],\
"IdEjecucionProceso": d[1],\
"Proceso": d[2],\
"Subproceso": d[3],\
"Periodo": d[4],\
"Mensaje": d[5],\
"Inicio": d[6],\
"Fin": d[7],\
"Estado": d[8],\
"Resultado": d[9]})
self.con.close()
self.mensaje = {"code": 200, "message": "Ejecucion existosa"}
self.mensaje["datos"] = self.datos
return self.mensaje
except:
return {"code": 401, "message": "Error en la ejecucion"}
Borrar archivos con mas de N días
# importing the required modules
import os
import glob
import time
# main function
def main():
dias = 7
archivosBorrados = list()
segundosLimite = time.time() - (dias * 24 * 60 * 60)
archivos = glob.glob("Archivos\\archivo_2022*.pdf")
for a in archivos:
fechaMod = time.ctime(os.path.getmtime(a))
if (segundosLimite >= os.path.getmtime(a)):
archivosBorrados.append(a)
# borrar archivo
os.remove(a)
print(archivosBorrados)
if __name__ == '__main__':
main()