-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug in Lib\site-packages\arcgis\_impl\common\_query.py #1895
Comments
@hildermesmedeiros Can you provide data where this is happening because we are not able to reproduce with any of our layers. |
@nanaeaubry However, just by reading the code, it seems clear to me that the error (duplicate data that doesn't exist in the service) is coming from there. I will try to create an example tomorrow.... I think creating a feature layer with 10k rows and randomly generating errors in the while loop might be enough to prove this. |
@nanaeaubry import pandas as pd
import numpy as np
import random
from faker import Faker
import wrapt
from functools import partial
import time
from arcgis.gis import GIS
from arcgis.features import FeatureLayer
from dask import delayed, compute
from dask.diagnostics import ProgressBar
from requests.adapters import HTTPAdapter
from requests.models import Response
gis = GIS('pro')
# Initialize Faker and seed for reproducibility install faker first
print('creating fake data')
fake = Faker()
np.random.seed(42)
Faker.seed(42)
num_rows = 100000
data = {
'ct_id': np.random.randint(1, 10000, num_rows),
'ct_name': [fake.city() for _ in range(num_rows)],
'ct_state': [fake.state_abbr() for _ in range(num_rows)],
'ct_country': ['USA'] * num_rows,
'data_date': pd.date_range(start='2022-01-01', periods=num_rows, freq='H'),
'data_date_br': pd.date_range(start='2022-01-01', periods=num_rows, freq='H').strftime('%d/%m/%Y %H:%M:%S'),
'data_humidity': np.random.uniform(10, 100, num_rows),
'data_pressure': np.random.uniform(950, 1050, num_rows),
'data_rain_precipitation': np.random.uniform(0, 50, num_rows),
'data_wind_velocity': np.random.uniform(0, 40, num_rows),
'data_wind_direction': [random.choice(['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']) for _ in range(num_rows)],
'data_wind_directiondegrees': np.random.uniform(0, 360, num_rows),
'data_wind_gust': np.random.uniform(0, 50, num_rows),
'data_temperature': np.random.uniform(-10, 40, num_rows),
}
# Generate the ct_key by combining longid (ct_id) and datetime in milliseconds
data['ct_key'] = [
f"{data['ct_id'][i]}{int(time.mktime(data['data_date'][i].timetuple()) * 1000)}"
for i in range(num_rows)
]
# Create the DataFrame
df = pd.DataFrame(data)
# Set appropriate data types
df['ct_id'] = df['ct_id'].astype('Int32')
df['ct_name'] = df['ct_name'].astype('string')
df['ct_state'] = df['ct_state'].astype('string')
df['ct_country'] = df['ct_country'].astype('string')
df['data_date'] = pd.to_datetime(df['data_date'])
df['data_date_br'] = df['data_date_br'].astype('string')
df['data_humidity'] = df['data_humidity'].astype('Float64')
df['data_pressure'] = df['data_pressure'].astype('Float64')
df['data_rain_precipitation'] = df['data_rain_precipitation'].astype('Float64')
df['data_wind_velocity'] = df['data_wind_velocity'].astype('Float64')
df['data_wind_direction'] = df['data_wind_direction'].astype('string')
df['data_wind_directiondegrees'] = df['data_wind_directiondegrees'].astype('Float64')
df['data_wind_gust'] = df['data_wind_gust'].astype('Float64')
df['data_temperature'] = df['data_temperature'].astype('Float64')
df['ct_key'] = df['ct_key'].astype('string')
print('publishing fake content')
fc = gis.content.import_table(df,service_name='testrow', title='test row')
table_url = fc.tables[0].url
# table_url = 'https://services.arcgis.com/qFQYQQeTXZSPY7Fs/arcgis/rest/services/testrow/FeatureServer/0'
fl = FeatureLayer(table_url, gis=gis)
print('-------starting-----')
df1 = fl.query().sdf
print(f'should be = {num_rows}')
print(f"df1: {df1.ObjectId.count()}") #missing data
##### try to get with dask in batchs (batchs of 1000)
class DaskFetcherError(Exception):
def __init__(self, message):
super().__init__(message)
self.message = message
def retry_decorator(wrapped=None, *, retry_num: int = 4, retry_sleep_sec: float = 10, backoff=True, backoff_factor: float=1.):
"""
Decorador de retry.
:param retry_num: número inteiro de tentativas
:param retry_sleep_sec: quantidade de segundos para esperar entre tentativas
:param backoff: True para ligar espera exponencial retry_sleep_sec * backoff_factor * (2 ** (attempt - 1))
:param backoff_factor:
:return:
"""
if wrapped is None:
return partial(retry_decorator, retry_num=retry_num, retry_sleep_sec=retry_sleep_sec,
backoff=backoff, backoff_factor=backoff_factor)
@wrapt.decorator
def wrapper(wrapped, instance, args, kwargs):
try_error = False
for attempt in range(1, retry_num + 1, 1):
try:
return wrapped(*args, **kwargs)
except Exception as err:
if not try_error:
# arcpy.AddMessage(f'def {wrapped.__name__} falhou')
print(f'def {wrapped.__name__} falhou')
try_error = True
assert retry_num >= 1
assert retry_sleep_sec > 0.
assert backoff_factor >= 1.
if backoff:
total_sleep_sec = retry_sleep_sec * backoff_factor * (2 ** (attempt - 1))
else:
total_sleep_sec = retry_sleep_sec
# arcpy.AddMessage(f'Retrying def {wrapped.__name__}, attempt {attempt}, sleeping {total_sleep_sec:.2f}s')
print(f'Retrying, attempt {attempt}, sleeping {total_sleep_sec:.2f} s)')
time.sleep(total_sleep_sec)
return wrapped(*args, **kwargs)
return wrapper(wrapped)
class FeatureLayerDataFetcher:
def __init__(self, gis, url):
"""
Initialize the FeatureLayerDataFetcher with an existing GIS object and the URL of the feature layer.
:param gis: An authenticated arcgis.gis.GIS object.
:param url: The URL of the feature layer.
"""
self.gis = gis
self.url = url
self.loading_msg = 'Fetching Layers in parallel with Dask: '
def get_total_rows(self):
"""
Get the total number of rows in the feature layer.
:return: Total number of rows.
"""
feature_layer = FeatureLayer(self.url, gis=self.gis)
query_result = feature_layer.query(where="1=1", return_count_only=True)
return query_result
def fetch_data_in_batches(self, batch_size=1000):
"""
Fetch data from the feature layer in batches of a specified size using Dask for parallelization.
:param batch_size: The number of rows to fetch in each batch. Default is 2000.
:return: List of DataFrames containing all fetched data.
"""
# Get the total number of rows
total_rows = self.get_total_rows()
offsets = range(0, total_rows+1, batch_size)
# Create a list of delayed objects for each batch
tasks = [delayed(self.get_data_batch)(self, offset, batch_size) for offset in offsets]
try:
print(self.loading_msg)
with ProgressBar():
results = compute(*tasks, scheduler='threads', optimize_graph=False)
except Exception as e:
print(e)
raise DaskFetcherError('Error fetching data in batches')
return results
@delayed
@retry_decorator
def get_data_batch(self, offset, batch_size):
"""
Fetch a batch of data from the feature layer.
:param offset: The starting offset for the batch.
:param batch_size: The number of rows to fetch in this batch.
:return: Pandas DataFrame of the fetched data.
"""
try:
fl = FeatureLayer(self.url, gis=self.gis)
query_result = fl.query(where="1=1",
out_fields="*",
return_all_records=False,
result_offset=offset,
result_record_count=batch_size,
return_exceeded_limit_features=False)
df = query_result.sdf
if not df.empty:
# Rename columns based on aliases
columns_alias = {f.name: f.alias for f in fl.properties.fields}
df = df.rename(columns=columns_alias)
return df
return pd.DataFrame([])
except Exception as e:
print(e)
raise DaskFetcherError(f'Error fetching batch at offset {offset}')
fetcher = FeatureLayerDataFetcher(gis, table_url)
total_rows = fetcher.get_total_rows()
print(f'total_rows: {total_rows}')
try:
data_batches = fetcher.fetch_data_in_batches()
# Concatenate all DataFrames into one if needed
df3 = pd.concat(data_batches, ignore_index=True)
print(f'should be = {num_rows}')
print(f"df3: {df3.ObjectId.count()}")
except DaskFetcherError as e:
print(e.message)
class NExceptionTimeoutHTTPAdapter(HTTPAdapter):
def __init__(self, fail_probability=0.1, *args, **kwargs):
"""
HTTP Adapter that randomly fails with a 504 Gateway Timeout error
with a specified probability.
:param fail_probability: Probability of simulating a 504 error (0 <= fail_probability <= 1).
"""
super().__init__(*args, **kwargs)
self.fail_probability = fail_probability
def send(self, request, *args, **kwargs):
# Simulate failure with a certain probability
if random.random() < self.fail_probability:
response = Response()
response.status_code = 504
response._content = b'''
{
"success": false,
"error": {
"code": 504,
"message": "HTTP Error 504: GATEWAY_TIMEOUT",
"details": ["The service was not able to fulfill the request, possibly due to invalid input, or the service may not be functioning properly."]
}
}
'''
response.url = request.url
return response
# Proceed with the actual request if no failure is simulated
response = super().send(request, *args, **kwargs)
return response
print('testing unstable server')
session = gis._con._session
adapter = NExceptionTimeoutHTTPAdapter()
session.mount('http://', adapter)
session.mount('https://', adapter)
df2 = fl.query().sdf
print(f'should be = {num_rows}')
print(f"df2: {df2.ObjectId.count()}")
if __name__ == '__main__':
print('end') outputed creating fake data
publishing fake content
-------starting-----
should be = 100000
df1: 99000
total_rows: 100000
Fetching Layers in parallel with Dask:
[####################################### ] | 99% Completed | 12.22 sAn error occurred.
(Error Code: 503)
def get_data_batch falhou
Retrying, attempt 1, sleeping 10.00 s)
[########################################] | 100% Completed | 24.92 s
should be = 100000
df3: 100000
testing unstable server
should be = 100000
df2: 2000
end I would look to the ps> parallel processing is disabled because I would have to do more code, you guys use lazy loading. The code I used as template won't work with lazy loading. But one can get the idea. The query is not working properly.....or so it seems, I might be doing something wrong. I just want to do fl.query().sdf and get all data.... |
Describe the bug
A clear and concise description of what the bug is.
To Reproduce
Steps to reproduce the behavior:
error:
# No error, but there are more rows than actually exist in the data.
Screenshots
Expected behavior
Check for duplicated data before inserting.
Platform (please complete the following information):
Additional context
Add any other context about the problem here, attachments etc.
The text was updated successfully, but these errors were encountered: