Icon

kn_​forum_​44928_​pyarrow_​batch_​url_​import

KNIME forum (44928) - PyArrow, Batch, URL

KNIME forum (44928) - PyArrow, Batch, URL

# conda env create -f="/Users/m_lauber/Dropbox/knime-workspace/forum/2022/kn_forum_44928_pyarrow_batch_url_import/data/py38_knime_forum.yml"# conda update -n py38_knime_forum --all# conda env update --name py38_knime_forum --file "/Users/m_lauber/Dropbox/knime-workspace/forum/2022/kn_forum_44928_pyarrow_batch_url_import/data/py38_knime_forum.yml" --prune# conda update -n base conda# conda install -c conda-forge aiohttp name: py38_knime_forum # Name of the created environmentchannels: # Repositories to search for packages- conda-forgedependencies: # List of packages that should be installed- python=3.8 # Python- py4j # used for KNIME <-> Python communication- nomkl # Prevents the use of Intel's MKL- pandas # Table data structures- jedi<=0.17.2 # Python script autocompletion- python-dateutil # Date and Time utilities- numpy # N-dimensional arrays- cairo # SVG support- pillow # Image inputs/outputs- matplotlib # Plotting- pyarrow>=6.0 # Arrow serialization- IPython # Notebook support- nbformat # Notebook support- scipy # Notebook support- jpype1 # Databases- python-flatbuffers # because tensorflow expects a version before 2- h5py # must be < 3.0 because they changed whether str or byte is returned- protobuf>3.12- libiconv # MDF Reader node- asammdf>=5.19.14 # MDF Reader node# - json- pytest-asyncio- aiohttp# - re# - random- lxml- typing import knime_io as knioimport jsonimport asyncioimport aiohttpfrom aiohttp import ClientError, ClientSession, TCPConnector, ClientTimeout, ClientConnectorError, ServerDisconnectedError, ClientResponseErrorfrom aiohttp.http_exceptions import HttpProcessingErrorimport osimport refrom datetime import datetimeimport randomimport pandasimport pyarrow as paimport timefrom lxml import htmlfrom typing import Dict, List, Optional, TupleHTTP_METHOD = "GET"URL_COLUMN = "BuiltURL"df = knio.input_tables[0].to_pandas()data = pa.record_batch(df)# Consumer coroutine - takes a task from queue for consumption async def consume(name, queue, session, results, event, len_urls): while True: status: Optional[int] = None body: Optional[str] = None error: Optional[str] = None try: print(f"Results = {len(results)}, URL's = {len_urls}") if len(results) >= len_urls: event.set() # Exctract from queue index, url, counter = await queue.get() async with session.request(method=HTTP_METHOD, url=url, allow_redirects=True, ssl=False) as response: status = response.status body = await response.text() except Exception as e: # Remove task from queue print("OTHER ERROR") #queue.task_done() print(str(e)) error = getattr(e, "message", str(e)) counter += 1 try: if (not status == 200 or error) and counter < 3: # Remove task from queue queue.task_done() print(f"Adding {index} back to queue - counter: {str(counter)}") queue.put_nowait((index, url, counter)) else: # Remove task from queue queue.task_done() results.append({"RowID": str(index), #"BuiltURL": url, "Status": int(status or 0),"Body": str(body[:100] or ""),"Date": str(datetime.now()),"Error": str(error or ""),"RetryCount": int(counter or 0) }) print(f"Status {status} for index {index}\nError: {error}") #print(results) print(queue.qsize()) except Exception as fin_err: print(f"Error formatting res - {str(fin_err)}")# Function to wait for eventasync def waiter(event): print('Waiting for it ...') # Wait for event object to be set to TRUE, meaning all requests are complete. Initially set to FALSE await event.wait() print('... got it!')# Where the magic happensasync def main(): # Request timeout timeout = ClientTimeout(10) # Close underlying sockets after connection releasing - otherwise IP isn't rotated for retries conn = TCPConnector(ssl=False, force_close=True) len_urls = len(urls) # Create set of tuples of URL's with RowID as index url_items = set(urls) # Count of consumers to spawn, depends on difficulty and no. of tasks concurrency = 10 if len_urls >= 10 else len_urls print(f"Concurrency = {concurrency}") # Create a Queue object. queue = asyncio.Queue() # Create an Event object. event = asyncio.Event() # List of tasks consumer = [] # List for results to return results = [] # Context manager for session async with ClientSession(connector=conn, timeout=timeout, raise_for_status=True) as session: # Producer tasks to add URL task tuples to queue for index, url in enumerate(url_items): # Create task and add to producer list counter = 0 queue.put_nowait((int(index), str(url), counter)) # Consumer tasks to handle tasks in queue for i in range(concurrency): # Create task and add to consumer list consumer.append(asyncio.create_task(consume(f'worker-{i}', queue, session, results, event, len_urls))) # Wait for count of queue to = tasks added to queue (this will triggger th event) await queue.join() # Spawn a Task to wait until 'event' is set - this happens in consume function waiter_task = asyncio.create_task(waiter(event)) # If inputs, wait until the waiter task is finished (i.e. event is set to TRUE if len_urls: await waiter_task # Cancel all remaining worker tasks, nothing left to consume for task in consumer: task.cancel() return results# We first have to create a table to which we can write the output batches that we create in the following.output_table = knio.batch_write_table()# Turn the first input table of the node into batches (use knio.input_tables[1] to batch the second one, etc.).# Then process the batches one by one.for batch in data: # input_batch = batch.to_pyarrow() input_batch = batch schema = pa.schema({"RowID": pa.string(), #"BuiltURL": url, "Status": pa.uint8(),"Body": pa.string(),"Date": pa.string(),"Error": pa.string(),"RetryCount": pa.uint8() }) #names = ['RowID', 'Status', 'Body', 'Date', 'Error', 'RetryCount'] rowid_list = [] status_list = [] body_list = [] date_list = [] error_list = [] retry_list = [] output_data=[] #output_batch = input_batch # ** Set RowID as index for input table ** urls = input_batch[URL_COLUMN] # This creates the loop gathered_results = asyncio.run(main()) for iter_dicts in gathered_results: #row_list = [] for key, val in iter_dicts.items(): if key == "RowID": rowid_list.append(val) if key == "Status": status_list.append(val) if key == "Body": body_list.append(val) if key == "Date": date_list.append(val) if key == "Error": error_list.append(val) if key == "RetryCount": retry_list.append(val) output_data.append(pa.array(rowid_list)) output_data.append(pa.array(status_list)) output_data.append(pa.array(body_list)) output_data.append(pa.array(date_list)) output_data.append(pa.array(error_list)) output_data.append(pa.array(retry_list)) print(output_data) #output_table.append(pandas.DataFrame(gathered_results).set_index('RowID')) output_table.append(pa.table(output_data, schema=schema)) # knio.write_table(output_table)knio.output_tables[0] = knio.write_table(output_table) KNIME forum (44928) - PyArrow, Batch, URLhttps://forum.knime.com/t/error-when-trying-to-convert-pyarrow-table-to-knime-table-with-python-script-labs/44928/2?u=mlauber71 Node 3792Node 3793py38_knime_forum Python Script(Labs) Table Creator Conda EnvironmentPropagation # conda env create -f="/Users/m_lauber/Dropbox/knime-workspace/forum/2022/kn_forum_44928_pyarrow_batch_url_import/data/py38_knime_forum.yml"# conda update -n py38_knime_forum --all# conda env update --name py38_knime_forum --file "/Users/m_lauber/Dropbox/knime-workspace/forum/2022/kn_forum_44928_pyarrow_batch_url_import/data/py38_knime_forum.yml" --prune# conda update -n base conda# conda install -c conda-forge aiohttp name: py38_knime_forum # Name of the created environmentchannels: # Repositories to search for packages- conda-forgedependencies: # List of packages that should be installed- python=3.8 # Python- py4j # used for KNIME <-> Python communication- nomkl # Prevents the use of Intel's MKL- pandas # Table data structures- jedi<=0.17.2 # Python script autocompletion- python-dateutil # Date and Time utilities- numpy # N-dimensional arrays- cairo # SVG support- pillow # Image inputs/outputs- matplotlib # Plotting- pyarrow>=6.0 # Arrow serialization- IPython # Notebook support- nbformat # Notebook support- scipy # Notebook support- jpype1 # Databases- python-flatbuffers # because tensorflow expects a version before 2- h5py # must be < 3.0 because they changed whether str or byte is returned- protobuf>3.12- libiconv # MDF Reader node- asammdf>=5.19.14 # MDF Reader node# - json- pytest-asyncio- aiohttp# - re# - random- lxml- typing import knime_io as knioimport jsonimport asyncioimport aiohttpfrom aiohttp import ClientError, ClientSession, TCPConnector, ClientTimeout, ClientConnectorError, ServerDisconnectedError, ClientResponseErrorfrom aiohttp.http_exceptions import HttpProcessingErrorimport osimport refrom datetime import datetimeimport randomimport pandasimport pyarrow as paimport timefrom lxml import htmlfrom typing import Dict, List, Optional, TupleHTTP_METHOD = "GET"URL_COLUMN = "BuiltURL"df = knio.input_tables[0].to_pandas()data = pa.record_batch(df)# Consumer coroutine - takes a task from queue for consumption async def consume(name, queue, session, results, event, len_urls): while True: status: Optional[int] = None body: Optional[str] = None error: Optional[str] = None try: print(f"Results = {len(results)}, URL's = {len_urls}") if len(results) >= len_urls: event.set() # Exctract from queue index, url, counter = await queue.get() async with session.request(method=HTTP_METHOD, url=url, allow_redirects=True, ssl=False) as response: status = response.status body = await response.text() except Exception as e: # Remove task from queue print("OTHER ERROR") #queue.task_done() print(str(e)) error = getattr(e, "message", str(e)) counter += 1 try: if (not status == 200 or error) and counter < 3: # Remove task from queue queue.task_done() print(f"Adding {index} back to queue - counter: {str(counter)}") queue.put_nowait((index, url, counter)) else: # Remove task from queue queue.task_done() results.append({"RowID": str(index), #"BuiltURL": url, "Status": int(status or 0),"Body": str(body[:100] or ""),"Date": str(datetime.now()),"Error": str(error or ""),"RetryCount": int(counter or 0) }) print(f"Status {status} for index {index}\nError: {error}") #print(results) print(queue.qsize()) except Exception as fin_err: print(f"Error formatting res - {str(fin_err)}")# Function to wait for eventasync def waiter(event): print('Waiting for it ...') # Wait for event object to be set to TRUE, meaning all requests are complete. Initially set to FALSE await event.wait() print('... got it!')# Where the magic happensasync def main(): # Request timeout timeout = ClientTimeout(10) # Close underlying sockets after connection releasing - otherwise IP isn't rotated for retries conn = TCPConnector(ssl=False, force_close=True) len_urls = len(urls) # Create set of tuples of URL's with RowID as index url_items = set(urls) # Count of consumers to spawn, depends on difficulty and no. of tasks concurrency = 10 if len_urls >= 10 else len_urls print(f"Concurrency = {concurrency}") # Create a Queue object. queue = asyncio.Queue() # Create an Event object. event = asyncio.Event() # List of tasks consumer = [] # List for results to return results = [] # Context manager for session async with ClientSession(connector=conn, timeout=timeout, raise_for_status=True) as session: # Producer tasks to add URL task tuples to queue for index, url in enumerate(url_items): # Create task and add to producer list counter = 0 queue.put_nowait((int(index), str(url), counter)) # Consumer tasks to handle tasks in queue for i in range(concurrency): # Create task and add to consumer list consumer.append(asyncio.create_task(consume(f'worker-{i}', queue, session, results, event, len_urls))) # Wait for count of queue to = tasks added to queue (this will triggger th event) await queue.join() # Spawn a Task to wait until 'event' is set - this happens in consume function waiter_task = asyncio.create_task(waiter(event)) # If inputs, wait until the waiter task is finished (i.e. event is set to TRUE if len_urls: await waiter_task # Cancel all remaining worker tasks, nothing left to consume for task in consumer: task.cancel() return results# We first have to create a table to which we can write the output batches that we create in the following.output_table = knio.batch_write_table()# Turn the first input table of the node into batches (use knio.input_tables[1] to batch the second one, etc.).# Then process the batches one by one.for batch in data: # input_batch = batch.to_pyarrow() input_batch = batch schema = pa.schema({"RowID": pa.string(), #"BuiltURL": url, "Status": pa.uint8(),"Body": pa.string(),"Date": pa.string(),"Error": pa.string(),"RetryCount": pa.uint8() }) #names = ['RowID', 'Status', 'Body', 'Date', 'Error', 'RetryCount'] rowid_list = [] status_list = [] body_list = [] date_list = [] error_list = [] retry_list = [] output_data=[] #output_batch = input_batch # ** Set RowID as index for input table ** urls = input_batch[URL_COLUMN] # This creates the loop gathered_results = asyncio.run(main()) for iter_dicts in gathered_results: #row_list = [] for key, val in iter_dicts.items(): if key == "RowID": rowid_list.append(val) if key == "Status": status_list.append(val) if key == "Body": body_list.append(val) if key == "Date": date_list.append(val) if key == "Error": error_list.append(val) if key == "RetryCount": retry_list.append(val) output_data.append(pa.array(rowid_list)) output_data.append(pa.array(status_list)) output_data.append(pa.array(body_list)) output_data.append(pa.array(date_list)) output_data.append(pa.array(error_list)) output_data.append(pa.array(retry_list)) print(output_data) #output_table.append(pandas.DataFrame(gathered_results).set_index('RowID')) output_table.append(pa.table(output_data, schema=schema)) # knio.write_table(output_table)knio.output_tables[0] = knio.write_table(output_table) KNIME forum (44928) - PyArrow, Batch, URLhttps://forum.knime.com/t/error-when-trying-to-convert-pyarrow-table-to-knime-table-with-python-script-labs/44928/2?u=mlauber71 Node 3792Node 3793py38_knime_forumPython Script(Labs) Table Creator Conda EnvironmentPropagation

Nodes

Extensions

Links