Icon

kn_​example_​python_​read_​parquet_​file_​2021

use the new (KNIME 4.6+) Python Script node and bundled Python version to read Parquet file into KNIME, export it again, put it into SQLite database and read it back

use the new (KNIME 4.6+) Python Script node and bundled Python version to read Parquet file into KNIME, export it again, put it into SQLite database and read it back

use the new (KNIME 4.6+) Python Script node and bundled Python version to read Parquet file into KNIME, export it again, put it into SQLite database and read it backhttps://forum.knime.com/t/widget-to-upload-multiple-files/69854/6?u=mlauber71 '''Example: import a Parquet file written from KNIME into Python and export it back to SQLite and also another Parquet'''# Import knime_io to access node inputs and outputs.import knime.scripting.io as knioimport numpy as np # linear algebraimport os # accessing directory structureimport pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)import pyarrow.parquet as pqimport sqlite3# the ../data/ path from the KNIME flow variables - in the new KNIME knio style ....var_path_data = knio.flow_variables['context.workflow.data-path']# the name of the parquet file including the pathv_path_parquet_file = knio.flow_variables['v_path_parquet_file']# import the local parquet file into Pythondf = pq.read_table(v_path_parquet_file).to_pandas()# export the file again to Parquet - this time using GZIP# you could also try and use a different pathv_second_parquet_file = var_path_data + "from_python_test_file.parquet"# import the local parquet file into Python as 'df' dataframedf.to_parquet(v_second_parquet_file, compression='gzip')# ----- SQLite ----------------------------------------# ----- location of SQLite Database# define some basic variables# the location of the SQLite databasevar_loc_sqlite = var_path_data + "db.sqlite"# define a function to connect you to the SQLite DB# by default the schema used is -well- defaultdef sqlite_connectme(sqlite_databse="default"): try: # First we define the Connection to the physical SQLite file on the file system global sqlite_connection sqlite_connection = sqlite3.connect(sqlite_databse) # then we have to define a cursor global sqlite_cursor sqlite_cursor = sqlite_connection.cursor() except: print("failed to connect " + sqlite_databse) pass# establish a connection to the DBsqlite_connectme(sqlite_databse=var_loc_sqlite) # define a function that cleans up the SQLite table for the file you want to insertdef sqlite_cleanup(sqlite_tabame="default"): try: sqlite_connection.execute("DROP TABLE IF EXISTS " + sqlite_tabame + ";") sqlite_connection.commit() except: print("failed to clean SQLite DB") pass# what name should the new database havev_sqlite_table_name = "test_file"# make sure the table is not theresqlite_cleanup(sqlite_tabame=v_sqlite_table_name)# insert the df dataframe into the SQLite tabledf.to_sql(name=v_sqlite_table_name, con=sqlite_connection, if_exists="replace")sqlite_connection.commit()# this will not have an immediate effect under KNIME# but you could work with these functions if you need them# list all tables that are in the SQLite file we accessed earliersqlite_table_list = sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';").fetchall()print(sqlite_table_list)# Retrieve column information of our desired file# Every column will be represented by a tuple with the following attributes:# (id, name, type, notnull, default_value, primary_key)# http://sebastianraschka.com/Articles/2014_sqlite_in_python_tutorial.html#querying-the-database---selecting-rowssqlite_cursor.execute('PRAGMA TABLE_INFO({})'.format(v_sqlite_table_name))sqlite_table_variables = [tup[1] for tup in sqlite_cursor.fetchall()]print(sqlite_table_variables)# fetch data from SQLite data we named earlier# hint: the SQLite DB stores the variable typesdf_sqlite = pd.read_sql_query("select * from " + v_sqlite_table_name + ";", sqlite_connection)# see what types of variables are theredf_sqlite_types = df_sqlite.dtypes# print(df_sqlite_types)# close the SQLite data base at the endsqlite_connection.close()# Pass the transformed table to the first output port of the node.knio.output_tables[0] = knio.Table.from_pandas(df_sqlite)# output_table = df_sqlite.copy() Split data from Python (Pandas dataframe) into several Parquet files import knime.scripting.io as knioimport pandas as pdimport mathimport os# The path is directly obtained from the flow variablevar_path_file_location = knio.flow_variables['v_path_split_file']df = knio.input_tables[0].to_pandas()def save_df_in_chunks(df, chunk_size, directory):""" Splits the DataFrame into chunks and saves each chunk as a separate Parquet file. Creates the directory if it does not exist. :param df: Pandas DataFrame to be saved. :param chunk_size: Number of rows in each chunk. :param directory: Directory where the Parquet files will be saved.""" # Create the directory if it does not exist if not os.path.exists(directory): os.makedirs(directory) num_chunks = math.ceil(len(df) / chunk_size) for i in range(num_chunks): start_idx = i * chunk_size end_idx = start_idx + chunk_size chunk = df.iloc[start_idx:end_idx] chunk.to_parquet(os.path.join(directory, f'chunk_{i}.parquet'))# Example usagechunk_size = knio.flow_variables['var_chunk_size'] # Define your chunk sizesave_df_in_chunks(df, chunk_size, var_path_file_location)knio.output_tables[0] = knio.Table.from_pandas(df) import knime.scripting.io as knioimport pandas as pdimport osdef read_parquet_files(directory):""" Reads all Parquet files in the specified directory and concatenates them into a single DataFrame. :param directory: Directory containing the Parquet files. :return: A single concatenated DataFrame.""" # List all Parquet files in the directory parquet_files = [f for f in os.listdir(directory) if f.endswith('.parquet')] # Read each Parquet file into a DataFrame dataframes = [pd.read_parquet(os.path.join(directory, file)) for file in parquet_files] # Concatenate all DataFrames into a single DataFrame return pd.concat(dataframes, ignore_index=True)# Example usagedirectory = knio.flow_variables['v_path_split_file']df = read_parquet_files(directory)# Pass the transformed table to the first output port of the node.knio.output_tables[0] = knio.Table.from_pandas(df) locate and create/data/ folderwith absolute pathsread Parquet file andexport to KNIME and SQLiteand again to Parquetv_path_parquet_fileknime://knime.workflow/data/db.sqlitefrom SQLite database /data/db.sqlitedefault.test_filev_path_*../data/test_file.parquetimport the originalparquet file../data/from_python_test_file.parquetimport the parquet filewritten with Pythontest_file.parquetgzip compressedtest_data_all_typesknio.flow_variables['var_py_version_pandas'] = pd.__version__knio.flow_variables['var_py_version_numpy'] = np.__version__knio.flow_variables['var_py_version'] = sys.version_infoknio.flow_variables['var_sys_path'] = sys.pathvar_*v_path_*v_path_split_filesplit thedata intoseveral filesvar_chunk_sizedefine the chunk sizeof the Parquet files../data/test_file_split/...import the split parquet filewritten with Pythonread a split parquet fileinto Pandas from within Python Collect LocalMetadata Python Script Java EditVariable (simple) SQLite Connector DB Table Selector DB Reader String to Path(Variable) Parquet Reader Parquet Reader Parquet Writer Test Data Generator test data Python Script Variable toTable Row String to Path(Variable) Java EditVariable (simple) Python Script IntegerConfiguration Parquet Reader Python Script use the new (KNIME 4.6+) Python Script node and bundled Python version to read Parquet file into KNIME, export it again, put it into SQLite database and read it backhttps://forum.knime.com/t/widget-to-upload-multiple-files/69854/6?u=mlauber71 '''Example: import a Parquet file written from KNIME into Python and export it back to SQLite and also another Parquet'''# Import knime_io to access node inputs and outputs.import knime.scripting.io as knioimport numpy as np # linear algebraimport os # accessing directory structureimport pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)import pyarrow.parquet as pqimport sqlite3# the ../data/ path from the KNIME flow variables - in the new KNIME knio style ....var_path_data = knio.flow_variables['context.workflow.data-path']# the name of the parquet file including the pathv_path_parquet_file = knio.flow_variables['v_path_parquet_file']# import the local parquet file into Pythondf = pq.read_table(v_path_parquet_file).to_pandas()# export the file again to Parquet - this time using GZIP# you could also try and use a different pathv_second_parquet_file = var_path_data + "from_python_test_file.parquet"# import the local parquet file into Python as 'df' dataframedf.to_parquet(v_second_parquet_file, compression='gzip')# ----- SQLite ----------------------------------------# ----- location of SQLite Database# define some basic variables# the location of the SQLite databasevar_loc_sqlite = var_path_data + "db.sqlite"# define a function to connect you to the SQLite DB# by default the schema used is -well- defaultdef sqlite_connectme(sqlite_databse="default"): try: # First we define the Connection to the physical SQLite file on the file system global sqlite_connection sqlite_connection = sqlite3.connect(sqlite_databse) # then we have to define a cursor global sqlite_cursor sqlite_cursor = sqlite_connection.cursor() except: print("failed to connect " + sqlite_databse) pass# establish a connection to the DBsqlite_connectme(sqlite_databse=var_loc_sqlite) # define a function that cleans up the SQLite table for the file you want to insertdef sqlite_cleanup(sqlite_tabame="default"): try: sqlite_connection.execute("DROP TABLE IF EXISTS " + sqlite_tabame + ";") sqlite_connection.commit() except: print("failed to clean SQLite DB") pass# what name should the new database havev_sqlite_table_name = "test_file"# make sure the table is not theresqlite_cleanup(sqlite_tabame=v_sqlite_table_name)# insert the df dataframe into the SQLite tabledf.to_sql(name=v_sqlite_table_name, con=sqlite_connection, if_exists="replace")sqlite_connection.commit()# this will not have an immediate effect under KNIME# but you could work with these functions if you need them# list all tables that are in the SQLite file we accessed earliersqlite_table_list = sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';").fetchall()print(sqlite_table_list)# Retrieve column information of our desired file# Every column will be represented by a tuple with the following attributes:# (id, name, type, notnull, default_value, primary_key)# http://sebastianraschka.com/Articles/2014_sqlite_in_python_tutorial.html#querying-the-database---selecting-rowssqlite_cursor.execute('PRAGMA TABLE_INFO({})'.format(v_sqlite_table_name))sqlite_table_variables = [tup[1] for tup in sqlite_cursor.fetchall()]print(sqlite_table_variables)# fetch data from SQLite data we named earlier# hint: the SQLite DB stores the variable typesdf_sqlite = pd.read_sql_query("select * from " + v_sqlite_table_name + ";", sqlite_connection)# see what types of variables are theredf_sqlite_types = df_sqlite.dtypes# print(df_sqlite_types)# close the SQLite data base at the endsqlite_connection.close()# Pass the transformed table to the first output port of the node.knio.output_tables[0] = knio.Table.from_pandas(df_sqlite)# output_table = df_sqlite.copy() Split data from Python (Pandas dataframe) into several Parquet files import knime.scripting.io as knioimport pandas as pdimport mathimport os# The path is directly obtained from the flow variablevar_path_file_location = knio.flow_variables['v_path_split_file']df = knio.input_tables[0].to_pandas()def save_df_in_chunks(df, chunk_size, directory):""" Splits the DataFrame into chunks and saves each chunk as a separate Parquet file. Creates the directory if it does not exist. :param df: Pandas DataFrame to be saved. :param chunk_size: Number of rows in each chunk. :param directory: Directory where the Parquet files will be saved.""" # Create the directory if it does not exist if not os.path.exists(directory): os.makedirs(directory) num_chunks = math.ceil(len(df) / chunk_size) for i in range(num_chunks): start_idx = i * chunk_size end_idx = start_idx + chunk_size chunk = df.iloc[start_idx:end_idx] chunk.to_parquet(os.path.join(directory, f'chunk_{i}.parquet'))# Example usagechunk_size = knio.flow_variables['var_chunk_size'] # Define your chunk sizesave_df_in_chunks(df, chunk_size, var_path_file_location)knio.output_tables[0] = knio.Table.from_pandas(df) import knime.scripting.io as knioimport pandas as pdimport osdef read_parquet_files(directory):""" Reads all Parquet files in the specified directory and concatenates them into a single DataFrame. :param directory: Directory containing the Parquet files. :return: A single concatenated DataFrame.""" # List all Parquet files in the directory parquet_files = [f for f in os.listdir(directory) if f.endswith('.parquet')] # Read each Parquet file into a DataFrame dataframes = [pd.read_parquet(os.path.join(directory, file)) for file in parquet_files] # Concatenate all DataFrames into a single DataFrame return pd.concat(dataframes, ignore_index=True)# Example usagedirectory = knio.flow_variables['v_path_split_file']df = read_parquet_files(directory)# Pass the transformed table to the first output port of the node.knio.output_tables[0] = knio.Table.from_pandas(df) locate and create/data/ folderwith absolute pathsread Parquet file andexport to KNIME and SQLiteand again to Parquetv_path_parquet_fileknime://knime.workflow/data/db.sqlitefrom SQLite database /data/db.sqlitedefault.test_filev_path_*../data/test_file.parquetimport the originalparquet file../data/from_python_test_file.parquetimport the parquet filewritten with Pythontest_file.parquetgzip compressedtest_data_all_typesknio.flow_variables['var_py_version_pandas'] = pd.__version__knio.flow_variables['var_py_version_numpy'] = np.__version__knio.flow_variables['var_py_version'] = sys.version_infoknio.flow_variables['var_sys_path'] = sys.pathvar_*v_path_*v_path_split_filesplit thedata intoseveral filesvar_chunk_sizedefine the chunk sizeof the Parquet files../data/test_file_split/...import the split parquet filewritten with Pythonread a split parquet fileinto Pandas from within PythonCollect LocalMetadata Python Script Java EditVariable (simple) SQLite Connector DB Table Selector DB Reader String to Path(Variable) Parquet Reader Parquet Reader Parquet Writer Test Data Generator test data Python Script Variable toTable Row String to Path(Variable) Java EditVariable (simple) Python Script IntegerConfiguration Parquet Reader Python Script

Nodes

Extensions

Links