"""
Define the game engine
"""
from typing import Optional
from constants import *
from themes.current_theme import *
from entities.stairs import Stairs
from entities.inventory import Inventory
from entities.entity import Entity
from entities.fighter import Fighter
from procedural_generation.game_map import GameMap
from recalculate_fov import recalculate_fov
from get_blocking_sprites import get_blocking_sprites
from map_to_sprites import map_to_sprites
from map_to_sprites import creatures_to_sprites
from entities.restore_entity import restore_entity
[docs]class GameLevel:
[docs] def __init__(self):
""" Initialize level instance. """
self.dungeon_sprites: Optional[arcade.SpriteList] = None
self.entities: Optional[arcade.SpriteList] = None
self.creatures: Optional[arcade.SpriteList] = None
self.level: int = 0
[docs]class GameEngine:
"""
This is the main game engine class, that manages the game and its actions.
"""
[docs] def __init__(self):
""" Set the game engine's attributes """
self.characters: Optional[arcade.SpriteList] = None
self.levels = []
self.cur_level_index = 0
self.cur_level = None
self.player: Optional[Entity] = None
self.game_map: Optional[GameMap] = None
self.messages = []
self.action_queue = []
self.selected_item: Optional[int] = None
self.game_state = STATE.NORMAL
self.grid_select_handlers = []
self.walk_sound = arcade.load_sound("sounds/footstep_concrete_002.ogg")
self.player_hit_monster_sound = arcade.load_sound("sounds/impactPunch_heavy_004.ogg")
self.monster_attack_sound = arcade.load_sound("sounds/impactPunch_heavy_001.ogg")
self.get_scroll_sound = arcade.load_sound("sounds/bookFlip2.ogg")
self.get_potion_sound = arcade.load_sound("sounds/sinkWater1.ogg")
self.level_up_sound = arcade.load_sound("sounds/powerUp1.ogg")
self.monster_death = arcade.load_sound("sounds/knifeSlice.ogg")
self.monster_walk_sound = arcade.load_sound("sounds/footstep04.ogg")
self.pickup_potion_sound = arcade.load_sound("sounds/sinkWater1.ogg")
self.pickup_scroll_sound = arcade.load_sound("sounds/bookFlip2.ogg")
self.error_sound = arcade.load_sound("sounds/error5.ogg")
self.heal_sound = arcade.load_sound("sounds/secret4.ogg")
[docs] def setup(self):
""" Set up the game here. Call this function to restart the game. """
# Set game state
# Create sprite lists
self.characters = arcade.SpriteList()
# Create player
fighter_component = Fighter(hp=30, defense=2, power=5, level=1)
self.player = Entity(
x=0,
y=0,
texture_id=PLAYER_TEXTURE_ID,
color=colors['player'],
fighter=fighter_component,
name="Player",
inventory=Inventory(capacity=5),
)
self.characters.append(self.player)
self.cur_level = self.setup_level(1)
self.levels.append(self.cur_level)
[docs] def setup_level(self, level_number: int) -> GameLevel:
"""
:param level_number:
"""
# --- Create map
# Size of the map
map_width = MAP_WIDTH
map_height = MAP_HEIGHT
level = GameLevel()
self.game_map = GameMap(map_width, map_height)
self.game_map.make_map(player=self.player, level=level_number)
level.dungeon_sprites = map_to_sprites(self.game_map.tiles)
level.entities = map_to_sprites(self.game_map.entities)
level.creatures = creatures_to_sprites(self.game_map.creatures)
level.level = level_number
# Set field of view
recalculate_fov(
self.player.x,
self.player.y,
FOV_RADIUS,
[level.dungeon_sprites, level.entities, level.creatures],
)
return level
[docs] def get_dict(self):
"""
Get a dictionary object for the entire game. Used in serializing
the game state for saving to disk or sending over the network.
"""
def get_entity_dict(entity: Entity):
name = entity.__class__.__name__
return {name: entity.get_dict()}
player_dict = get_entity_dict(self.player)
levels_dict = []
for level in self.levels:
dungeon_dict = []
for sprite in level.dungeon_sprites:
dungeon_dict.append(get_entity_dict(sprite))
entity_dict = []
for sprite in level.entities:
entity_dict.append(get_entity_dict(sprite))
creature_dict = []
for sprite in level.creatures:
creature_dict.append(get_entity_dict(sprite))
level_dict = {'dungeon': dungeon_dict,
'entities': entity_dict,
'creatures': creature_dict}
levels_dict.append(level_dict)
result = {'player': player_dict,
'levels': levels_dict}
return result
[docs] def restore_from_dict(self, data: dict):
"""
Restore this object from a dictionary object. Used in recreating a game from a
saved state, or from over the network.
:param data:
"""
player_dict = data['player']
self.player.restore_from_dict(player_dict['Entity'])
for level_dict in data['levels']:
level = GameLevel()
level.dungeon_sprites = arcade.SpriteList(
use_spatial_hash=True, spatial_hash_cell_size=16
)
level.entities = arcade.SpriteList(
use_spatial_hash=True, spatial_hash_cell_size=16
)
level.creatures = arcade.SpriteList(
use_spatial_hash=True, spatial_hash_cell_size=16
)
for entity_dict in level_dict['dungeon']:
entity = restore_entity(entity_dict)
level.dungeon_sprites.append(entity)
for entity_dict in level_dict['entities']:
entity = restore_entity(entity_dict)
level.entities.append(entity)
for creature_dict in level_dict['creatures']:
creature = restore_entity(creature_dict)
level.creatures.append(creature)
self.levels.append(level)
self.cur_level = self.levels[-1]
[docs] def grid_click(self, grid_x, grid_y):
""" Handle a click on the grid """
# Loop through anyone that has registered a grid-select handler
for f in self.grid_select_handlers:
results = f(grid_x, grid_y)
if results:
self.action_queue.extend(results)
# Clear the handler queue
self.grid_select_handlers = []
[docs] def move_player(self, cx: int, cy: int):
"""
Process player movement
:param cx:
:param cy:
"""
# See what grid location we'd move to
nx = self.player.x + cx
ny = self.player.y + cy
# See if there are walls or blocking entities there
blocking_dungeon_sprites = get_blocking_sprites(nx, ny, self.cur_level.dungeon_sprites)
blocking_entity_sprites = get_blocking_sprites(nx, ny, self.cur_level.creatures)
if not blocking_dungeon_sprites and not blocking_entity_sprites:
# Nothing is blocking us, we can move
self.player.x += cx
self.player.y += cy
self.walk_sound.play()
# Figure out our field-of-view
recalculate_fov(
self.player.x,
self.player.y,
FOV_RADIUS,
[self.cur_level.dungeon_sprites, self.cur_level.creatures, self.cur_level.entities],
)
# Let the enemies move
results = [{"enemy_turn": True}]
self.action_queue.extend(results)
elif blocking_entity_sprites:
# Can't move that way, but there is a monster there.
# Attack it.
target = blocking_entity_sprites[0]
if target.fighter and not target.is_dead:
results = self.player.fighter.attack(target)
arcade.play_sound(self.player_hit_monster_sound)
self.action_queue.extend(results)
results = [{"enemy_turn": True}]
self.action_queue.extend(results)
[docs] def move_enemies(self):
""" Process enemy movement. """
full_results = []
for creature in self.cur_level.creatures:
if creature.ai:
results = creature.ai.take_turn(
target=self.player,
sprite_lists=[self.cur_level.dungeon_sprites, self.cur_level.creatures],
)
full_results.extend(results)
return full_results
[docs] def dying(self, target: Entity) -> list:
"""
Handle event of an entity dying
:param target:
"""
target.color = colors["dying"]
# target.visible_color = colors["dying"]
target.is_dead = True
if target is self.player:
results = [{"message": "Player has died!"}]
else:
# If a monster dies, set up a message and add a delay
results = [
{"message": f"{target.name} has been killed!"},
{"delay": {"time": DEATH_DELAY, "action": {"dead": target}}},
]
return results
[docs] def use_stairs(self):
# Get all the entities at this location
entities = arcade.get_sprites_at_exact_point(
self.player.position, self.cur_level.dungeon_sprites
)
# For each entity
for entity in entities:
if isinstance(entity, Stairs):
level = self.setup_level(self.cur_level.level + 1)
self.cur_level = level
self.levels.append(level)
return [{"message": "You went down a level."}]
return [{"message": "There are no stairs here"}]
[docs] def pick_up(self):
"""
Handle a pick-up item entity request.
"""
# Get all the entities at this location
entities = arcade.get_sprites_at_exact_point(
self.player.position, self.cur_level.entities
)
print(f"There are {len(entities)} items")
# For each entity
for entity in entities:
# Make sure it is an entity so type-checker is happy
if isinstance(entity, Entity):
# If entity is an item...
if entity.item:
# Try and get it. (Inventory might be full.)
results = self.player.inventory.add_item(entity)
return results
else:
print(f"Can't get {entity.name}")
else:
raise ValueError("Sprite is not an instance of Entity.")
return None
[docs] def check_experience_level(self):
"""
See if the player should level up
"""
if self.player.fighter.level < len(EXPERIENCE_PER_LEVEL):
xp_to_next_level = EXPERIENCE_PER_LEVEL[self.player.fighter.level - 1]
if self.player.fighter.current_xp >= xp_to_next_level:
self.player.fighter.ability_points += 1
self.player.fighter.level += 1
self.action_queue.extend([{"message": "Level up!!!"}])
arcade.play_sound(self.level_up_sound)
[docs] def process_action_queue(self, delta_time: float):
"""
Process the action queue, kind of a dispatch-center for the game.
:param delta_time:
"""
new_action_queue = []
for action in self.action_queue:
if "enemy_turn" in action:
new_actions = self.move_enemies()
if new_actions:
new_action_queue.extend(new_actions)
if "message" in action:
print(action["message"])
self.messages.append(action["message"])
if "dying" in action:
target = action["dying"]
new_actions = self.dying(target)
arcade.play_sound(self.monster_death)
if new_actions:
new_action_queue.extend(new_actions)
if "dead" in action:
target = action["dead"]
target.texture_id = DEAD_BODY_TEXTURE_ID
target.color = colors["dead_body"]
target.visible_color = colors["dead_body"]
target.blocks = False
if target is not self.player:
self.player.fighter.current_xp += target.fighter.xp_reward
if "delay" in action:
target = action["delay"]
target["time"] -= delta_time
if target["time"] > 0:
new_action_queue.extend([{"delay": target}])
else:
new_action_queue.extend([target["action"]])
if "pickup" in action:
new_actions = self.pick_up()
if new_actions:
new_action_queue.extend(new_actions)
if "select_item" in action:
item_number = action["select_item"]
if 1 <= item_number <= self.player.inventory.capacity:
# Fix up for 0 based index
if self.selected_item != item_number - 1:
self.selected_item = item_number - 1
new_action_queue.extend({"enemy_turn": True})
if "play_sound" in action:
target = action["play_sound"]
if target == "monster_walk":
arcade.play_sound(self.monster_walk_sound)
elif target == "monster_attack":
arcade.play_sound(self.monster_attack_sound)
elif target == "pickup_potion":
arcade.play_sound(self.pickup_potion_sound)
elif target == "pickup_scroll":
arcade.play_sound(self.pickup_scroll_sound)
elif target == "heal":
arcade.play_sound(self.heal_sound)
elif target == "error":
arcade.play_sound(self.error_sound)
else:
print(f"Warning, unknown sound trigger {target}.")
if "use_item" in action:
item_number = self.selected_item
if item_number is not None:
item = self.player.inventory.get_item_number(item_number)
if item:
results = item.use(self)
if results:
new_action_queue.extend(results)
if "drop_item" in action:
item_number = self.selected_item
if item_number is not None:
item = self.player.inventory.get_item_number(item_number)
if item:
self.player.inventory.remove_item_number(item_number)
self.entities.append(item)
item.center_x = self.player.center_x
item.center_y = self.player.center_y
new_action_queue.extend(
[{"message": f"You dropped the {item.name}."}]
)
if "use_stairs" in action:
result = self.use_stairs()
if result:
new_action_queue.extend(result)
# Reload the action queue with new items
self.action_queue = new_action_queue