Icon

kn_​example_​python_​read_​parquet_​file

use Python to read Parquet file into KNIME

use Python to read parquet file into KNIME, export it again, put it into SQLite databse and read it back

use Python to read Parquet file into KNIME, export it again, put it into SQLite databse and read it back import numpy as np # linear algebraimport os # accessing directory structureimport pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)import pyarrow.parquet as pqimport sqlite3# the ../data/ path from the KNIME flow variablesvar_path_data = flow_variables['var_path_data']# the name of the parquet file including the pathv_path_parquet_file = flow_variables['v_path_parquet_file']# import the local parquet file into Pythondf = pq.read_table(v_path_parquet_file).to_pandas()# export the file again to Parquet - this time using GZIP# you could also try and use a different pathv_second_parquet_file = v_path_parquet_file + ".gz"df.to_parquet(v_second_parquet_file, compression='gzip')# ----- SQLite ----------------------------------------# ----- location of SQLite Database# define some basic variables# the location of the SQLite databasevar_loc_sqlite = var_path_data + "db.sqlite"# define a function to connect you to the SQLite DB# by default the schema used is -well- defaultdef sqlite_connectme(sqlite_databse="default"): try: # First we define the Connection to the physical SQLite file on the file system global sqlite_connection sqlite_connection = sqlite3.connect(sqlite_databse) # then we have to define a cursor global sqlite_cursor sqlite_cursor = sqlite_connection.cursor() except: print("failed to connect " + sqlite_databse) pass# establish a connection to the DBsqlite_connectme(sqlite_databse=var_loc_sqlite) # define a function that cleans up the SQLite table for the file you want to insertdef sqlite_cleanup(sqlite_tabame="default"): try: sqlite_connection.execute("DROP TABLE IF EXISTS " + sqlite_tabame + ";") sqlite_connection.commit() except: print("failed to clean SQLite DB") pass# what name should the new database havev_sqlite_table_name = "test_file"# make sure the table is not theresqlite_cleanup(sqlite_tabame=v_sqlite_table_name)# insert the df dataframe into the SQLite tabledf.to_sql(name=v_sqlite_table_name, con=sqlite_connection, if_exists="replace")sqlite_connection.commit()# this will not have an immediate effect under KNIME# but you could work with these functions if you need them# list all tables that are in the SQLite file we accessed earliersqlite_table_list = sqlite_cursor.execute("SELECT name FROM sqlite_master WHEREtype='table';").fetchall()print(sqlite_table_list)# Retrieve column information of our desired file# Every column will be represented by a tuple with the following attributes:# (id, name, type, notnull, default_value, primary_key)# http://sebastianraschka.com/Articles/2014_sqlite_in_python_tutorial.html#querying-the-database---selecting-rowssqlite_cursor.execute('PRAGMA TABLE_INFO({})'.format(v_sqlite_table_name))sqlite_table_variables = [tup[1] for tup in sqlite_cursor.fetchall()]print(sqlite_table_variables)# fetch data from SQLite data we named earlier# hint: the SQLite DB stores the variable typesdf_sqlite = pd.read_sql_query("select * from " + v_sqlite_table_name + ";", sqlite_connection)# see what types of variables are theredf_sqlite_types = df_sqlite.dtypesprint(df_sqlite_types)# close the SQLite data base at the endsqlite_connection.close()output_table = df_sqlite.copy() v_path_parquet_filedummy datatest_file.parquetknime://knime.workflow/data/db.sqlitedefault.test_file../data/test_file.parquetimport the originalparquet fileread Parquet file andexport to KNIME and SQLite../data/test_file.parquet.gzimport the parquet filewritten with Python collect meta data Java EditVariable (simple) Data Generator Parquet Writer SQLite Connector DB Table Selector DB Reader Parquet Reader Python Source Parquet Reader use Python to read Parquet file into KNIME, export it again, put it into SQLite databse and read it back import numpy as np # linear algebraimport os # accessing directory structureimport pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)import pyarrow.parquet as pqimport sqlite3# the ../data/ path from the KNIME flow variablesvar_path_data = flow_variables['var_path_data']# the name of the parquet file including the pathv_path_parquet_file = flow_variables['v_path_parquet_file']# import the local parquet file into Pythondf = pq.read_table(v_path_parquet_file).to_pandas()# export the file again to Parquet - this time using GZIP# you could also try and use a different pathv_second_parquet_file = v_path_parquet_file + ".gz"df.to_parquet(v_second_parquet_file, compression='gzip')# ----- SQLite ----------------------------------------# ----- location of SQLite Database# define some basic variables# the location of the SQLite databasevar_loc_sqlite = var_path_data + "db.sqlite"# define a function to connect you to the SQLite DB# by default the schema used is -well- defaultdef sqlite_connectme(sqlite_databse="default"): try: # First we define the Connection to the physical SQLite file on the file system global sqlite_connection sqlite_connection = sqlite3.connect(sqlite_databse) # then we have to define a cursor global sqlite_cursor sqlite_cursor = sqlite_connection.cursor() except: print("failed to connect " + sqlite_databse) pass# establish a connection to the DBsqlite_connectme(sqlite_databse=var_loc_sqlite) # define a function that cleans up the SQLite table for the file you want to insertdef sqlite_cleanup(sqlite_tabame="default"): try: sqlite_connection.execute("DROP TABLE IF EXISTS " + sqlite_tabame + ";") sqlite_connection.commit() except: print("failed to clean SQLite DB") pass# what name should the new database havev_sqlite_table_name = "test_file"# make sure the table is not theresqlite_cleanup(sqlite_tabame=v_sqlite_table_name)# insert the df dataframe into the SQLite tabledf.to_sql(name=v_sqlite_table_name, con=sqlite_connection, if_exists="replace")sqlite_connection.commit()# this will not have an immediate effect under KNIME# but you could work with these functions if you need them# list all tables that are in the SQLite file we accessed earliersqlite_table_list = sqlite_cursor.execute("SELECT name FROM sqlite_master WHEREtype='table';").fetchall()print(sqlite_table_list)# Retrieve column information of our desired file# Every column will be represented by a tuple with the following attributes:# (id, name, type, notnull, default_value, primary_key)# http://sebastianraschka.com/Articles/2014_sqlite_in_python_tutorial.html#querying-the-database---selecting-rowssqlite_cursor.execute('PRAGMA TABLE_INFO({})'.format(v_sqlite_table_name))sqlite_table_variables = [tup[1] for tup in sqlite_cursor.fetchall()]print(sqlite_table_variables)# fetch data from SQLite data we named earlier# hint: the SQLite DB stores the variable typesdf_sqlite = pd.read_sql_query("select * from " + v_sqlite_table_name + ";", sqlite_connection)# see what types of variables are theredf_sqlite_types = df_sqlite.dtypesprint(df_sqlite_types)# close the SQLite data base at the endsqlite_connection.close()output_table = df_sqlite.copy() v_path_parquet_filedummy datatest_file.parquetknime://knime.workflow/data/db.sqlitedefault.test_file../data/test_file.parquetimport the originalparquet fileread Parquet file andexport to KNIME and SQLite../data/test_file.parquet.gzimport the parquet filewritten with Pythoncollect meta data Java EditVariable (simple) Data Generator Parquet Writer SQLite Connector DB Table Selector DB Reader Parquet Reader Python Source Parquet Reader

Nodes

Extensions

Links