data=[(1,'Aman'),(2,'Akash')]
hedaer_schema = ['id','Name']
df = spark.createDataFrame(data,hedaer_schema)
df.show()
data=[{'id':1,'Name':'Aman'},{'id':2,'Name':'Akash'}]
df = spark.createDataFrame(data)
df.show()
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
data=[{'id':1,'Name':'Aman'},{'id':2,'Name':'Akash'}]
hedaer_schema = StructType([StructField(name='id',dataType=IntegerType()),
StructField(name='Name',dataType=StringType())
])
df = spark.createDataFrame(data)
df.show()
path = 'dbfs:/FileStore/tables/effects_of_covid_19_on_trade_at_15_december_2021_provisional.csv';
df = spark.read.csv(path,header=True,inferSchema=True)
display(df)
path = 'dbfs:/FileStore/tables/effects_of_covid_19_on_trade_at_15_december_2021_provisional.csv';
df = spark.read.format('csv').option(key='header',value=True).load(path)
display(df)
#Schema should be same
path1 = 'dbfs:/FileStore/tables/effects_of_covid_19_on_trade_at_15_december_2021_provisional.csv';
path2 = 'dbfs:/FileStore/tables/effects_of_covid_19_on_trade_at_15_december_2021_provisional.csv';
df = spark.read.csv(path=[path1,path2],header=True,inferSchema=True)
display(df)
#All file schema should be same in this folder
folder_path = 'dbfs:/FileStore/tables/';
df = spark.read.csv(path=folder_path,header=True,inferSchema=True)
display(df)
Note : We can also pass schema if we want to read only some columns
#Creating Data frame
data=[(1,'Aman'),(2,'Akash')]
hedaer_schema = ['id','Name']
path='dbfs:/FileStore/tables/maydata'
df = spark.createDataFrame(data,hedaer_schema)
df.write.option("header",True).csv(path,header=True,mode='overWrite')
# For reading data you can just provide the folder path
NOTE : Read ,write operation for json
path = 'dbfs:/FileStore/tables/effects_of_covid_19_on_trade_at_15_december_2021_provisional.csv';
df=spark.read.csv(path=path,header=True,inferSchema=True);
df.show(n=4,truncate=False,vertical=True) #n number of colums,vertical direction of data to show
from pyspark.sql.functions import col
path = 'dbfs:/FileStore/tables/effects_of_covid_19_on_trade_at_15_december_2021_provisional.csv';
df=spark.read.csv(path=path,header=True,inferSchema=True);
df=df.withColumn('Updated Value',col=col('Value')/100) #Here we can also cast the value .cast('Integer')
df.show(n=2,vertical=True)
path = 'dbfs:/FileStore/tables/effects_of_covid_19_on_trade_at_15_december_2021_provisional.csv';
df=spark.read.csv(path=path,header=True,inferSchema=True);
df=df.withColumnRenamed('Country','RenamedCountry')
df.show()
NOTE : You can also pass ArrayType(IntegerType)
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
data=[{'id':1,'Name':'Aman'},{'id':2,'Name':'Akash'}]
hedaer_schema = StructType([StructField(name='id',dataType=IntegerType()),
StructField(name='Name',dataType=StringType())
])
df = spark.createDataFrame(data)
df.show()
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
path = 'dbfs:/FileStore/tables/effects_of_covid_19_on_trade_at_15_december_2021_provisional.csv';
hedaer_schema = StructType([\
StructField(name='RenamedCountry',dataType=StringType()),\
StructField(name='Value',dataType=IntegerType())
])
df=spark.read.csv(path=path,header=True,schema=hedaer_schema);
df.display()